@RunWith(SpringRunner.class) @Import(KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest(classes = EmbeddedKafkaApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest {
@ClassRule public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired public KafkaTemplate<String, String> template;
@Autowired private KafkaConsumer consumer;
@Autowired private KafkaProducer producer;
@Value("${test.topic}") private String topic;
@Test public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { template.send(topic, "Sending with default template"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), CoreMatchers.equalTo(0L)); assertThat(consumer.getPayload(), CoreMatchers.containsString("embedded-test-topic")); }
@Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own controller"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), CoreMatchers.equalTo(0L)); assertThat(consumer.getPayload(), CoreMatchers.containsString("embedded-test-topic")); }
@TestConfiguration static class KafkaTestContainersConfiguration {
@Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
@Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }
@Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
} }
|