Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

RabbitMQ Java Stream Client and RabbitMQ Kubernetes Operator

I deployed a Kubernetes cluster using the RabbitMQ Operator and activated the rabbitmq_stream plugin. This is my yaml:

apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: rabbitmq-deployment
  namespace: rabbitmq-namespace
spec:
  replicas: 2
  image: rabbitmq:3.11.13
  persistence:
    storage: 20Gi
  service:
    type: LoadBalancer
  rabbitmq:
    additionalPlugins:
      - rabbitmq_stream
      - rabbitmq_stream_management

Also i use the RabbitMQ Java Stream Client and i’m connecting to the cluster like this:

EnvironmentBuilder environmentBuilder = Environment.builder();
environmentBuilder.host(System.getenv("RABBITMQ_HOST"));
environmentBuilder.port(Integer.parseInt(System.getenv("RABBITMQ_STREAM_PORT")));
environmentBuilder.username(System.getenv("RABBITMQ_USERNAME"));
environmentBuilder.password(System.getenv("RABBITMQ_PASSWORD"));
mainConnection = environmentBuilder.build();

Now when i use this client so create the stream, it’s working flawlessy and no error is reported:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

mainConnection.streamCreator().stream("mystream").maxAge(Duration.of(1, ChronoUnit.DAYS)).create()

Now when i try to produce messages like this:

Producer producer = RabbitMQStreamConnection.mainConnection.producerBuilder().stream("mystream").build();
byte[] messagePayload = "hello".getBytes(StandardCharsets.UTF_8);
producer.send(
    producer.messageBuilder().addData(messagePayload).build(),
    confirmationStatus -> {
        if (confirmationStatus.isConfirmed()) {
            // the message made it to the broker
        } else {
            // the message did not make it to the broker
        }
});

It throws this exception:

com.rabbitmq.stream.StreamException
Error while creating stream connection to rabbitmq-deployment-server-0.rabbitmq-deployment-nodes.rabbitmq-namespace:5552

Of course, because there are two nodes (replicas = 2) and it seems like traffic gets redirected directly.

What i want is that i can produce & consume messages from the stream.

Right now, i have no clue what i could do next to solve this problem.

>Solution :

You should use the load balancer configuration.

See: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#when-a-load-balancer-is-in-use

A load balancer can misguide the client when it tries to connect to nodes that host stream leaders and replicas. The "Connecting to Streams" blog post covers why client applications must connect to the appropriate nodes in a cluster and how a load balancer can make things complicated for them.

The EnvironmentBuilder#addressResolver(AddressResolver) method allows intercepting the node resolution after metadata hints and before connection. Applications can use this hook to ignore metadata hints and always use the load balancer, as illustrated in the following snippet:

Using a custom address resolver to always use a load balancer

Address entryPoint = new Address("my-load-balancer", 5552);  
Environment environment = Environment.builder()
    .host(entryPoint.host())  
    .port(entryPoint.port())  
    .addressResolver(address -> entryPoint)  
    .build();
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading