
Linuxeden 开源社区 --
Spring for Apache Kafka 2.1.0 已发布,同时发布的还有 1.3.2 和 2.0.2 维护版本,包含重要的 Bug 修复。
2.1.0 版本的主要将 kafka-clients 库升级到 1.0.0,以及一些改进:
- Sometimes, when a message can’t be processed, you may wish to stop the container so the condition can be corrected and the message re-delivered. The framework now provides the
ContainerStoppingErrorHandlerfor record listeners andContainerStoppingBatchErrorHandlerfor batch listeners. - The
KafkaAdminnow supports increasing partitions when aNewTopicbean is detected with a larger number of partitions than currently exist on the topic. StringJsonMessageConverterandJsonSerializer/JsonDeserializernow pass and consume type information inHeaders. This allows multiple types to be easily sent/received on the same topic:
@SpringBootApplication
public class Kafka21Application {
public static void main(String[] args) {
SpringApplication.run(Kafka21Application.class, args)
.close();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
return args -> {
template.send(MessageBuilder.withPayload(42)
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
template.send(MessageBuilder.withPayload("43")
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
Thread.sleep(5_000);
};
}
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Component
@KafkaListener(id = "multi", topics = "blog")
public static class Listener {
@KafkaHandler
public void intListener(Integer in) {
System.out.println("Got an int: " + in);
}
@KafkaHandler
public void stringListener(String in) {
System.out.println("Got a string: " + in);
}
}
}
Got an int: 42 Got a string: 43
- the
JsonSerializerandJsonDeserializercan be configured using kafka properties for the producer/consumer.
转自 http://ift.tt/2jCqXcS
The post Spring For Apache Kafka 2.1.0 和 1.3.2 发布 appeared first on Linuxeden开源社区.
http://ift.tt/2i8qoHm
没有评论:
发表评论