Apache Kafka: 设置和运行的逐步指南
今天的计算系统每天都会产生数百万条数据记录。这些数据包括您的金融交易、下订单的数据或汽车传感器的数据。要实时处理这些数据流事件,并可靠地在不同的企业系统之间移动事件记录,您需要使用apache kafka。
apache kafka是一个处理每秒超过100万条记录的开源数据流解决方案。除了高吞吐量外,apache kafka还提供高可扩展性和可用性、低延迟和永久存储。
linkedin、uber和netflix等公司都依赖apache kafka进行实时处理和数据流。开始使用apache kafka的最简单方法是在本地机器上启动它。这样不仅可以看到apache kafka服务器的运行情况,还可以生产和消费消息。
通过亲自体验启动服务器、创建主题并使用kafka客户端编写java代码,您将能够使用apache kafka满足所有数据流水线需求。
如何在本地机器上下载apache kafka
您可以从官方链接下载最新版本的apache kafka。下载的内容将以.tgz
格式压缩。下载完成后,您需要解压缩。
如果您使用linux,打开终端。然后导航到您下载apache kafka压缩版本的位置。运行以下命令:
tar -xzvf kafka_2.13-3.5.0.tgz
命令完成后,您将发现一个名为kafka_2.13-3.5.0
的新目录。使用以下命令进入该目录:
cd kafka_2.13-3.5.0
现在,您可以使用ls
命令列出该目录的内容。
对于windows用户,您可以按照相同的步骤进行操作。如果找不到tar
命令,可以使用第三方工具如winzip打开压缩文件。
如何在本地机器上启动apache kafka
在下载并解压apache kafka之后,现在是时候开始运行它了。它不需要任何安装程序。您可以直接通过命令行或终端窗口使用它。
在开始使用apache kafka之前,请确保您的系统已安装java 8+。apache kafka需要运行中的java安装。
#1. 运行apache zookeeper服务器
第一步是运行apache zookeeper。您可以作为归档的一部分预先下载它。它是一个负责维护配置并为其他服务提供同步的服务。
一旦您进入已解压缩归档内容的目录,运行以下命令:
对于linux用户:
bin/zookeeper-server-start.sh config/zookeeper.properties
对于windows用户:
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
《zookeeper.properties》文件提供了运行apache zookeeper服务器的配置。您可以配置诸如数据存储的本地目录和服务器运行的端口之类的属性。
#2. 启动apache kafka服务器
既然已经启动了apache zookeeper服务器,现在是时候启动apache kafka服务器了。
打开一个新的终端或命令提示窗口,导航到提取文件所在的目录,然后可以使用以下命令启动apache kafka服务器:
对于linux用户:
bin/kafka-server-start.sh config/server.properties
对于windows用户:
bin/windows/kafka-server-start.bat config/server.properties
您的apache kafka服务器正在运行。如果您想要更改默认配置,可以通过修改《server.properties》文件来实现。不同的值可以在 官方文档 中找到。
如何在本地机器上使用apache kafka
您现在可以在本地机器上使用apache kafka来生产和消费消息。由于apache zookeeper和apache kafka服务器已经运行起来了,让我们看看如何创建第一个主题、生产第一条消息并消费相同的消息。
在apache kafka中创建主题的步骤是什么?
在创建第一个主题之前,让我们了解一下什么是主题。在apache kafka中,主题是一个逻辑数据存储,有助于数据流。可以将其视为从一个组件传输数据的通道。
一个主题支持多个生产者和多个消费者 – 多个系统可以写入和读取主题。与其他消息系统不同,可以多次消费来自主题的任何消息。此外,您还可以指定消息的保留期限。
让我们以一个生产银行交易数据的系统(生产者)和另一个消费此数据并向用户发送应用程序通知的系统(消费者)为例。为了实现这一点,需要一个主题。
打开一个新的终端或命令提示窗口,并导航到您提取档案的目录。以下命令将创建一个名为《transactions》的主题:
对于linux用户:
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
对于windows用户:
bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092
您现在已经创建了第一个主题,并且准备好开始生产和消费消息了。
如何向apache kafka生产消息?
准备好你的apache kafka主题后,你现在可以产生你的第一条消息。打开一个新的终端窗口或命令提示符窗口,或者使用你用来创建主题的相同窗口。接下来,确保你在你提取存档内容的正确目录中。你可以使用命令行来使用以下命令在主题上产生你的消息:
对于linux用户:
bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
对于windows用户:
bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092
运行命令后,你会看到你的终端或命令提示符窗口正在等待输入。编写你的第一条消息,然后按enter键。
> 这是一条100美元的交易记录
你已经在你的本地机器上向apache kafka产生了你的第一条消息。随后,你现在可以准备消费这条消息了。
如何从apache kafka消费消息?
假设你的主题已经被创建并且你已经向kafka主题中产生了一条消息,那么你现在可以消费那条消息。
apache kafka允许你将多个消费者附加到同一个主题上。每个消费者可以是一个消费者组的一部分 – 一个逻辑标识符。例如,如果你有两个需要消费相同数据的服务,那么它们可以有不同的消费者组。
然而,如果你有两个相同服务的实例,那么你应该避免消费和处理相同的消息两次。在这种情况下,它们都将有相同的消费者组。
在终端或命令提示符窗口中,确保你在正确的目录中。使用以下命令启动消费者:
对于linux用户:
bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
对于windows用户:
bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
你会看到你之前产生的消息出现在你的终端上。你现在已经使用apache kafka消费了你的第一条消息。
kafka-console-consumer
命令接受了许多传递的参数。让我们看看每个参数的含义:
--topic
指定了你要消费的主题--from-beginning
告诉控制台消费者从第一条消息开始读取- 通过
--bootstrap-server
选项来指定你的apache kafka服务器 - 此外,你可以通过传递
--group
参数来指定消费者组 - 在没有消费者组参数的情况下,它会自动生成
使用控制台消费者运行时,你可以尝试产生新的消息。你会看到它们都被消费并出现在你的终端上。
现在,您已经创建了主题并成功地生成和消费了消息,让我们将其与java应用程序集成。
如何使用java创建apache kafka生产者和消费者
在开始之前,请确保在本地计算机上安装了java 8+。apache kafka提供了自己的客户端库,可以让您无缝连接。如果您使用maven来管理依赖项,则将以下依赖项添加到pom.xml
中:
org.apache.kafka
kafka-clients
3.5.0
您还可以从maven存储库中下载该库,并将其添加到您的java类路径中。
安装库之后,打开一个代码编辑器。让我们看看如何使用java启动生产者和消费者。
创建apache kafka java生产者
有了kafka-clients
库,您现在可以开始创建kafka生产者。
让我们创建一个名为simpleproducer.java
的类。这将负责在之前创建的主题上生成消息。在此类中,您将创建一个org.apache.kafka.clients.producer.kafkaproducer
的实例。随后,您将使用此生产者发送消息。
要创建kafka生产者,您需要apache kafka服务器的主机和端口。由于您在本地机器上运行它,主机将为localhost
。假设您在启动服务器时没有更改默认属性,则端口将为9092
。考虑以下代码,它将帮助您创建生产者:
package org.example.kafka;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import org.apache.kafka.common.serialization.stringserializer;
import java.util.properties;
import java.util.concurrent.executionexception;
import java.util.concurrent.future;
public class simpleproducer {
private final kafkaproducer producer;
public simpleproducer(string host, string port) {
string server = host + ":" + port;
properties properties = new properties();
properties.setproperty(producerconfig.bootstrap_servers_config, server);
properties.setproperty(producerconfig.key_serializer_class_config, stringserializer.class.getname());
properties.setproperty(producerconfig.value_serializer_class_config, stringserializer.class.getname());
this.producer = new kafkaproducer(properties);
}
}
您会注意到有三个属性被设置。让我们快速浏览一下每个属性:
- bootstrap_servers_config允许您定义apache kafka服务器的运行位置
- key_serializer_class_config告诉生产者要使用什么格式发送消息键。
- 使用value_serializer_class_config属性定义发送实际消息的格式。
由于您将发送文本消息,因此这两个属性都设置为使用stringserializer.class
。
为了实际向您的主题发送消息,您需要使用producer.send()
方法,该方法接受一个producerrecord
。以下代码提供了一个将消息发送到主题并打印响应以及消息偏移量的方法。
public void produce(string topic, string message) throws executionexception, interruptedexception {
producerrecord record = new producerrecord(topic, message);
final future send = this.producer.send(record);
final recordmetadata recordmetadata = send.get();
system.out.println(recordmetadata);
}
完成所有代码后,您现在可以向主题发送消息。您可以使用一个main
方法来测试,如下所示:
package org.example.kafka;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import org.apache.kafka.common.serialization.stringserializer;
import java.util.properties;
import java.util.concurrent.executionexception;
import java.util.concurrent.future;
public class simpleproducer {
private final kafkaproducer producer;
public simpleproducer(string host, string port) {
string server = host + ":" + port;
properties properties = new properties();
properties.setproperty(producerconfig.bootstrap_servers_config, server);
properties.setproperty(producerconfig.key_serializer_class_config, stringserializer.class.getname());
properties.setproperty(producerconfig.value_serializer_class_config, stringserializer.class.getname());
this.producer = new kafkaproducer(properties);
}
public void produce(string topic, string message) throws executionexception, interruptedexception {
producerrecord record = new producerrecord(topic, message);
final future send = this.producer.send(record);
final recordmetadata recordmetadata = send.get();
system.out.println(recordmetadata);
}
public static void main(string[] args) throws exception{
simpleproducer producer = new simpleproducer("localhost", "9092");
producer.produce("transactions", "this is a transactional record of $200");
}
}
在此代码中,您正在创建一个simpleproducer
,它连接到您的本地机器上的apache kafka服务器。它在内部使用kafkaproducer
在您的主题上产生文本消息。
创建apache kafka java消费者
是时候使用java客户端创建一个apache kafka消费者了。创建一个名为simpleconsumer.java
的类。接下来,你将为这个类创建一个构造函数,该构造函数初始化org.apache.kafka.clients.consumer.kafkaconsumer
。为了创建消费者,你需要提供apache kafka服务器运行的主机和端口。此外,你还需要提供消费者组以及要从中消费的主题。使用下面给出的代码片段:
package org.example.kafka;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.apache.kafka.common.serialization.stringdeserializer;
import java.time.duration;
import java.util.list;
import java.util.properties;
import java.util.concurrent.atomic.atomicboolean;
public class simpleconsumer {
private static final string offset_reset = "earliest";
private final kafkaconsumer consumer;
private boolean keepconsuming = true;
public simpleconsumer(string host, string port, string consumergroupid, string topic) {
string server = host + ":" + port;
properties properties = new properties();
properties.setproperty(consumerconfig.bootstrap_servers_config, server);
properties.setproperty(consumerconfig.group_id_config, consumergroupid);
properties.setproperty(consumerconfig.auto_offset_reset_config, offset_reset);
properties.setproperty(consumerconfig.key_deserializer_class_config, stringdeserializer.class.getname());
properties.setproperty(consumerconfig.value_deserializer_class_config, stringdeserializer.class.getname());
this.consumer = new kafkaconsumer(properties);
this.consumer.subscribe(list.of(topic));
}
}
与kafka生产者相似,kafka消费者也接受一个properties对象。让我们看看设置的所有不同属性:
- bootstrap_servers_config告诉消费者apache kafka服务器的运行位置
- 使用group_id_config来指定消费者组
- 当消费者开始消费时,auto_offset_reset_config允许你指定从多远的位置开始消费消息
- key_deserializer_class_config告诉消费者消息键的类型
- value_deserializer_class_config告诉消费者实际消息的类型
由于在你的情况下,你将消费文本消息,所以反序列化属性设置为stringdeserializer.class
。
现在你将从主题中消费消息。为了保持简单,一旦消费了消息,你将把消息打印到控制台上。让我们看看你可以使用下面的代码来实现这个目标:
private boolean keepconsuming = true;
public void consume() {
while (keepconsuming) {
final consumerrecords consumerrecords = this.consumer.poll(duration.ofmillis(100l));
if (consumerrecords != null && !consumerrecords.isempty()) {
consumerrecords.iterator().foreachremaining(consumerrecord -> {
system.out.println(consumerrecord.value());
});
}
}
}
这段代码将会不断地轮询主题。当接收到任何消费记录时,消息将会被打印出来。使用主方法来测试你的消费者。你将启动一个java应用程序,它将不断地消费主题并打印消息。停止java应用程序以终止消费者。
package org.example.kafka;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.apache.kafka.common.serialization.stringdeserializer;
import java.time.duration;
import java.util.list;
import java.util.properties;
import java.util.concurrent.atomic.atomicboolean;
public class simpleconsumer {
private static final string offset_reset = "earliest";
private final kafkaconsumer consumer;
private boolean keepconsuming = true;
public simpleconsumer(string host, string port, string consumergroupid, string topic) {
string server = host + ":" + port;
properties properties = new properties();
properties.setproperty(consumerconfig.bootstrap_servers_config, server);
properties.setproperty(consumerconfig.group_id_config, consumergroupid);
properties.setproperty(consumerconfig.auto_offset_reset_config, offset_reset);
properties.setproperty(consumerconfig.key_deserializer_class_config, stringdeserializer.class.getname());
properties.setproperty(consumerconfig.value_deserializer_class_config, stringdeserializer.class.getname());
this.consumer = new kafkaconsumer(properties);
this.consumer.subscribe(list.of(topic));
}
public void consume() {
while (keepconsuming) {
final consumerrecords consumerrecords = this.consumer.poll(duration.ofmillis(100l));
if (consumerrecords != null && !consumerrecords.isempty()) {
consumerrecords.iterator().foreachremaining(consumerrecord -> {
system.out.println(consumerrecord.value());
});
}
}
}
public static void main(string[] args) {
simpleconsumer simpleconsumer = new simpleconsumer("localhost", "9092", "transactions-consumer", "transactions");
simpleconsumer.consume();
}
}
当你运行这段代码时,你会注意到它不仅消费了由你的java生产者产生的消息,还消费了你通过控制台生产者产生的消息。这是因为将auto_offset_reset_config
属性设置为earliest
。
在simpleconsumer运行的情况下,你可以使用控制台生产者或simpleproducer java应用程序向主题中进一步产生消息。你会看到它们被消费并打印在控制台上。
使用apache kafka满足所有数据管道需求
apache kafka让你轻松处理所有的数据管道需求。在你的本地机器上设置apache kafka后,你可以探索kafka提供的所有不同功能。此外,官方的java客户端让你可以高效地编写、连接和与你的apache kafka服务器通信。
作为一个多功能、可扩展、性能非常高的数据流系统,apache kafka可以真正改变你的游戏规则。你可以将它用于本地开发,甚至将其集成到你的生产系统中。就像在本地设置很容易一样,为更大的应用程序设置apache kafka也不是一件大事。
如果你正在寻找数据流平台,你可以查看用于实时分析和处理的最佳流数据平台。