Skip to content

在 AWS EC2 上配置 MSK 交互相关的环境并创建 Topic

标签
基础设施/云服务商/亚马逊云/AWS
基础设施/消息中间件/Kafka/亚马逊云/AWS/MSK
基础设施/消息中间件/Kafka
开发/后端/消息队列/Kafka
操作系统/CentOS
操作系统/CentOS/CentOS-8
命令行/wget
开发/语言/Java
开发/语言/Java/JDK
字数
849 字
阅读时间
5 分钟

安装 Java

shell
sudo yum -y install java-11

下载 Kafka

用下面的命令下载 Kafka,Kafka 的版本号应该要与 AWS Console 中的版本号一一对应

shell
wget https://archive.apache.org/dist/kafka/<Kafka 版本>/kafka_2.13-<Kafka 版本>.tgz

以集群为 3.5.1 版本的 Kafka 为例:

shell
wget https://archive.apache.org/dist/kafka/3.5.1/kafka_2.13-3.5.1.tgz

这样下载就好了

shell
$ wget https://archive.apache.org/dist/kafka/3.5.1/kafka_2.13-3.5.1.tgz
--2024-01-28 10:03:13--  https://archive.apache.org/dist/kafka/3.5.1/kafka_2.13-3.5.1.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 106748875 (102M) [application/x-gzip]
Saving to: ‘kafka_2.13-3.5.1.tgz’

100%[=====================================================================================================>] 106,748,875 11.2MB/s   in 13s    

2024-01-28 10:03:27 (7.86 MB/s) - ‘kafka_2.13-3.5.1.tgz’ saved [106748875/106748875]

然后解压缩

shell
tar -xzf kafka_2.13-3.5.1.tgz
[ec2-user@ip-10-100-105-95 ~]$ ls
app  etoplus-mds  home  kafka_2.13-3.5.1  kafka_2.13-3.5.1.tgz

为环境进行额外配置

下载 AWS MSK 拓展库

在 EC2 上与 MSK 交互,需要使用到 AWS IAM 在 EC2 上的认证和鉴权机制,而这样的机制是要求我们安装额外的鉴权拓展类的,可以通过下面的命令下载

shell
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar

下载好之后应该是这样的

shell
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
--2024-01-28 10:03:56--  http://wget/
Resolving wget (wget)... failed: Name or service not known.
wget: unable to resolve host address ‘wget’
--2024-01-28 10:03:57--  https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/292053394/b0314dc8-42a6-4855-8114-7de9dc35c48b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240128%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240128T100357Z&X-Amz-Expires=300&X-Amz-Signature=47a72642671b2a24eca100c163ab3030a02fa51b7ca2e7c9bf321b14f33199e7&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=292053394&response-content-disposition=attachment%3B%20filename%3Daws-msk-iam-auth-1.1.1-all.jar&response-content-type=application%2Foctet-stream [following]
--2024-01-28 10:03:57--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/292053394/b0314dc8-42a6-4855-8114-7de9dc35c48b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240128%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240128T100357Z&X-Amz-Expires=300&X-Amz-Signature=47a72642671b2a24eca100c163ab3030a02fa51b7ca2e7c9bf321b14f33199e7&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=292053394&response-content-disposition=attachment%3B%20filename%3Daws-msk-iam-auth-1.1.1-all.jar&response-content-type=application%2Foctet-stream
Resolving objects.githubusercontent.com (objects.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to objects.githubusercontent.com (objects.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12417891 (12M) [application/octet-stream]
Saving to: ‘aws-msk-iam-auth-1.1.1-all.jar’

100%[=====================================================================================================>] 12,417,891  2.82MB/s   in 4.8s   

2024-01-28 10:04:04 (2.47 MB/s) - ‘aws-msk-iam-auth-1.1.1-all.jar’ saved [12417891/12417891]

FINISHED --2024-01-28 10:04:04--
Total wall clock time: 7.5s
Downloaded: 1 files, 12M in 4.8s (2.47 MB/s)

移动到下载的 Kafka 的目录下的 libs 目录下面

shell
mv aws-msk-iam-auth-1.1.1-all.jar ./kafka_2.13-3.5.1/libs/
shell
vim ./kafka_2.13-3.5.1/bin/client.properties

配置客户端连接配置

内容如下

txt
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

而这其中的

sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

一行就是用于自动通过 EC2 机器上的 IAM 认证机制完成认证的拓展库的关联配置。

授予执行权限

shell
chmod +x ./kafka_2.13-3.5.1/bin/kafka-topics.sh

创建 Topic

shell
./kafka_2.13-<Kafka 版本>/bin/kafka-topics.sh --create --bootstrap-server <连接字符> --command-config ./kafka_2.13-<Kafka 按版本>/bin/client.properties --replication-factor <集群 Replica> --partitions <分区> --topic <Topic 的名>

比如这样

shell
./kafka_2.13-3.5.1/bin/kafka-topics.sh --create --bootstrap-server <连接字符> --command-config ./kafka_2.13-3.5.1/bin/client.properties --replication-factor 2 --partitions 1 --topic test.consumer

问题排查

创建时遭遇 Replication factor: 3 larger than available brokers: 2

shell
./kafka_2.13-3.5.1/bin/kafka-topics.sh --create --bootstrap-server <连接字符> --command-config ./kafka_2.13-3.5.1/bin/client.properties --replication-factor 3 --partitions 1 --topic test.consumer
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
[2024-01-28 10:07:56,910] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
 (kafka.admin.TopicCommand$)

这是因为连接到的集群的 Replica(副本)数量和命令要求的不一致导致的,不可以在创建 Topic 的时候指定超过 Replica(副本)数量的「副本倍率」,需要至少是 Replica(副本)数量的整数倍或者能被整除才行。

创建时遭遇 Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.

shell
$ ./kafka_2.13-3.5.1/bin/kafka-topics.sh --create --bootstrap-server <连接字符> --command-config ./kafka_2.13-3.5.1/bin/client.properties --replication-factor 3 --partitions 1 --topic test.consumer

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class: Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.
        at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
        at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:112)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:145)
        at org.apache.kafka.clients.admin.AdminClientConfig.<init>(AdminClientConfig.java:245)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:134)
        at kafka.admin.TopicCommand$TopicService$.createAdminClient(TopicCommand.scala:203)
        at kafka.admin.TopicCommand$TopicService$.apply(TopicCommand.scala:207)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:51)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

之前下载好的 AWS Auth 拓展类需要移动到 Kafka 安装目录的 libs 目录下面:

shell
mv aws-msk-iam-auth-1.1.1-all.jar ./kafka_2.13-3.5.1/libs/

参考资料

Step 4: Create a topic - Amazon Managed Streaming for Apache Kafka

贡献者

页面历史

撰写