zhaoyy0513

zhaoyy0513

V2EX 第 305963 号会员,加入于 2018-04-04 12:01:01 +08:00
zhaoyy0513 最近回复了
@yangxin0 招 JAVA 吗 老哥
2023-05-10 10:15:03 +08:00
回复了 CoCoMcRee 创建的主题 分享创造 我把 ChatGPT+ Midjourney 接入了钉钉群聊.
该群不存在或群主未开启群可被搜索功能 ...
2023-05-09 11:50:14 +08:00
回复了 Braisdom 创建的主题 程序员 智能 SQL 分析系统(我的新作品)
等一波开源
2023-05-08 10:18:17 +08:00
回复了 slomo 创建的主题 Linux 我该用哪个 Linux 发行版
@onice 确实很不错 打算用来做开发环境了
2023-05-06 10:21:21 +08:00
回复了 slomo 创建的主题 Linux 我该用哪个 Linux 发行版
推荐 debian 的 mint 我现在就是用的 mint 回复你的
2023-04-25 16:37:30 +08:00
回复了 NoKey 创建的主题 程序员 请教, kafka 如何做到一个 topic 分发不同的类型的消息
要实现上游系统 A 将消息发送到下游系统 B 、C 、D ,并确保每个下游系统只处理自己需要处理的消息,同时还要确保消息只被消费一次,可以采用以下方案:

使用 Kafka 作为消息中间件,将上游系统 A 发送的消息发布到一个名为"topic-A"的 Kafka 主题中。

在下游系统 B 、C 、D 中,创建三个不同的消费者组,分别为"group-B"、"group-C"、"group-D",并订阅"topic-A"主题。

在消费者端,使用 Kafka 中的消息过滤器来过滤掉不需要的消息,只选择要处理的消息。可以使用 Kafka 中的消息键(key)来实现过滤。例如,下游系统 B 只想处理键(key)为"key-B"的消息,可以使用以下代码来实现:

java
Copy
// 创建 Kafka 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-B");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅"topic-A"主题
consumer.subscribe(Collections.singletonList("topic-A"));

// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.key().equals("key-B")) {
// 处理消息
}
}
consumer.commitSync();
}
```

为了确保消息只被消费一次,将消费者的 auto.offset.reset 属性设置为"earliest",并启用自动提交偏移量。这将确保消费者在启动时从最早可用的偏移量开始消费,以避免漏掉任何消息,并且将自动提交偏移量以确保每个消息只被消费一次。例如,可以使用以下代码来实现:

java
Copy
// 创建 Kafka 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-B");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
```
使用上述方案,上游系统 A 可以将消息发送到"topic-A"主题中,下游系统 B 、C 、D 可以使用 Kafka 消费者订阅该主题,并使用消息过滤器来过滤掉不需要的消息,只选择要处理的消息。自动提交偏移量将确保每个消息只被消费一次。





上面两条回复都是 chatgpt 回复的
2023-04-25 16:23:45 +08:00
回复了 NoKey 创建的主题 程序员 请教, kafka 如何做到一个 topic 分发不同的类型的消息
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class KafkaStreamsExample {
public static void main(String[] args) {
// 设置 Kafka Streams 属性
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// 创建 Kafka Streams 实例
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("topic-A");

// 根据消息的 key 将消息路由到不同的分区中
stream.selectKey((key, value) -> key)
.through("topic-A-shuffle")
.groupByKey()
.foreach((key, value) -> {
// 处理消息
System.out.println("Processed message: " + value);
});

// 将处理后的消息发送到下游服务
stream.mapValues(value -> "processed " + value)
.to("topic-B", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();
}
}

在上面的代码中,首先使用 selectKey()方法将消息的 key 作为新的 key ,然后使用 through()方法将消息发送到一个新的 Topic 中,这个新的 Topic 会使用 Kafka 默认的分区策略将消息路由到不同的分区中。然后,我们使用 groupByKey()方法将同一个 key 的消息分组,确保每个消费者只消费自己需要的消息。最后,我们使用 foreach()方法处理分组后的消息,并使用 mapValues()方法将处理后的消息发送到下游服务。

需要注意的是,使用分流操作可能会导致数据倾斜(data skew)问题,因为某些 key 的消息可能比其他 key 的消息更频繁,从而导致某些分区比其他分区拥有更多的消息。为了解决这个问题,可以使用一些分区策略(partitioning strategy),例如随机分配、循环分配、哈希分配等。
2023-04-25 16:12:30 +08:00
回复了 NoKey 创建的主题 程序员 请教, kafka 如何做到一个 topic 分发不同的类型的消息
@Super8 我创建的 KafkaConsumer 用到的 api 里面没有这两个参数的方法啊老哥,你说的这个 kafka 是哪个版本的啊
关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   4612 人在线   最高记录 6679   ·     Select Language
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.8.5 · 17ms · UTC 05:35 · PVG 13:35 · LAX 21:35 · JFK 00:35
Developed with CodeLauncher
♥ Do have faith in what you're doing.