Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Cannot connect to the Kafka docker instance

I am using bitnami/kafka:latest and bitnami/zookeper:latest images.

My sample project in Github: https://github.com/KostasD21/kafka-docker-demo

if you run the command docker-compose up and then run the application, you will see that the Spring Boot application cannot connect to the kafka container (which is currently running under the host kafka:9092)

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

The stacktrace from the application:

2022-05-31 08:59:18.255  WARN 144414 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-foo-1, groupId=foo] Error connecting to node kafka:9092 (id: 1001 rack: null)

java.net.UnknownHostException: kafka
    at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1367) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1301) ~[na:na]
    at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:979) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1520) ~[spring-kafka-2.8.4.jar:2.8.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.4.jar:2.8.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1338) ~[spring-kafka-2.8.4.jar:2.8.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247) ~[spring-kafka-2.8.4.jar:2.8.4]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

logs from the docker container:

2022-05-31 05:51:24,236] INFO [RequestSendThread controllerId=1001] Starting (kafka.controller.RequestSendThread)[2022-05-31 05:51:24,236] INFO [Controller id=1001] Currently active brokers in the cluster: Set(1001) (kafka.controller.KafkaController)[2022-05-31 05:51:24,236] INFO [Controller id=1001] Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)[2022-05-31 05:51:24,237] INFO [Controller id=1001] Current list of topics in the cluster: Set(__consumer_offsets, test-topic) (kafka.controller.KafkaController)[2022-05-31 05:51:24,237] INFO [Controller id=1001] Fetching topic deletions in progress (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] List of topics to be deleted:  (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] List of topics ineligible for deletion:  (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] Initializing topic deletion manager (kafka.controller.KafkaController)[2022-05-31 05:51:24,242] INFO [Topic Deletion Manager 1001] Initializing manager with initial deletions: Set(), initial ineligible deletions: Set() (kafka.controller.TopicDeletionManager)[2022-05-31 05:51:24,242] INFO [Controller id=1001] Sending update metadata request (kafka.controller.KafkaController)[2022-05-31 05:51:24,243] INFO [ExpirationReaper--1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2022-05-31 05:51:24,244] INFO [Controller id=1001 epoch=3] Sending UpdateMetadata request to brokers Set(1001) for 0 partitions (state.change.logger)[2022-05-31 05:51:24,249] INFO [ReplicaStateMachine controllerId=1001] Initializing replica state (kafka.controller.ZkReplicaStateMachine)[2022-05-31 05:51:24,252] INFO [ReplicaStateMachine controllerId=1001] Triggering online replica state changes (kafka.controller.ZkReplicaStateMachine)[2022-05-31 05:51:24,253] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to kafka:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)[2022-05-31 05:51:24,258] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)

Currently I am stuck on this communication. The problem has to do with docker networking probably but I am not an expert on this aspect.

>Solution :

Check this answer: https://stackoverflow.com/a/51634499/19059974

From the point of view of your application, Kafka is listening in localhost (kafka hostname would be if it is running in within the same docker network)

Essentially, you need to add a protocol mapper and a listener that announces that kafka is reachable at localhost (see previous answer), change the port where your kafka container binds to 29092 (in your docker-compose file) and have your app connect to localhost:29092

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading