[SOLVED] Spring Cloud Stream StreamBridge low performance?

Issue

I’m using a Spring Cloud StreamBridge to publish messages to a RabbitMQ exchange. With the native RabbitMQ PerfTest i easily get up to 100k msgs/s (1 channel) using a single producer. If i launch a thread with a while loop with a sending StreamBrige (also 1 channel) i’m only getting ~20k msgs/s with similar settings (no persistence, no manual acks or confirms, same Docker containers..). I’m using Spring Cloud Stream and Rabbit Binder 3.2.2.

My yml looks like this:

spring:
  rabbitmq:
    host: localhost
    port: 5672

  cloud:
    function:
      definition: producer1;

    stream:
      bindings:
        producer1-out-0:
          destination: messageQueue
          #requiredGroups: consumerGroup1,
      rabbit:
        bindings:
          producer1-out-0:
            producer:
              deliveryMode: NON_PERSISTENT
              exchangeType: direct
              bindingRoutingKey: default_message
              routingKeyExpression: '''default_message'''
              #maxLength: 1
      output-bindings: producer1;

and my sending loop, RabbitMQ PerfTest-Tool is written in Java and looks similar:

        @Autowired
        public StreamBridge streamBridge;

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        @PostConstruct
        public void launchProducer() {
            Runnable task = () -> {
                while (true){
                    streamBridge.send("producer1-out-0", "msg");
                }
            };
            executorService.submit(task);
        }

also in my console i’m getting a weird msg Channel 'unknown.channel.name' has 1 subscriber(s) at startup and i don’t know why.

Is the slow sending rate using StreamBridge a natural Spring limitation or do i have something misconfigured?
Thanks for help 🙂

Solution

There will always be some overheade when using an abstraction on top of the native API; however, 5x doesn’t sound right.

i’m using -x 1 -y 1 -a as arguments, means only 1 producer is publishing messages with auto consumer-acks

That probably explains it then; auto ack means no acks – the broker acks the message immediately when it is is sent to the consumer (risking message loss). The equivalent in Spring is Acknowledgemode.NONE; it’s default is for the container to ack each message individually.

See
https://docs.spring.io/spring-amqp/docs/current/reference/html/#acknowledgeMode

and

https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize

also

https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount

Spring AMQP sets it to 250 by default, but SCSt’s default is 1, which is significantly slower.

EDIT

Interesting; SCSt does appear to add some significant overhead over Spring Integration alone.

The following tests various scenarios from the native Java client and adding more and more Spring abstractions on top, finally using StreamBridge; it should probably be profiled to see where the cost is and whether it can be mitigated.

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct

logging.level.root=warn
@SpringBootApplication
public class So71414000Application {

    public static void main(String[] args) {
        SpringApplication.run(So71414000Application.class, args).close();
    }

    @Bean
    ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
        return args -> {
            /*
             * Native java API
             */
            Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
            Channel channel = conn.createChannel();
            byte[] msg = "msg".getBytes();
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
            int count = 1000000;
            StopWatch watch = watch("native");
            IntStream.range(0, count).forEach(i -> {
                try {
                    channel.basicPublish("foo", "", props, msg);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            });
            perf(count, watch);
            channel.close();
            conn.close();
        };
    }

    @Bean
    ApplicationRunner runner2(RabbitTemplate template) {
        return args -> {
            /*
             * Single ChannelProxy, no cache, no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("nocache");
            int count = 1000000;
            template.invoke(t -> {
                IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
                return null;
            });
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner3(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("cached channel");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner4(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), conversion
             */
            StopWatch watch = watch("message conversion");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner5(RabbitTemplate template) {
        return args -> {
            /*
             * Spring Integration
             */
            AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
            outbound.setExchangeName("foo");
            outbound.setRoutingKey("");
            DirectChannel channel = new DirectChannel();
            EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
            consumer.start();
            GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
            StopWatch watch = watch("Spring Integration");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> channel.send(msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner6(StreamBridge bridge) {
        return args -> {
            /*
             * Stream bridge
             */
            StopWatch watch = watch("Stream Bridge");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
            perf(count, watch);
        };
    }


    private StopWatch watch(String name) {
        StopWatch watch = new StopWatch();
        watch.start(name);
        return watch;
    }

    private void perf(int count, StopWatch watch) {
        watch.stop();
        System.out.println(watch.prettyPrint());
        System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
    }

}

With these results on my MacBook Air (2018 1.6GHz I5) and a bare metal broker:


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
10949129530  100%  native

91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
14175481691  100%  nocache

70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
16300449457  100%  cached channel

61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
18206111556  100%  message conversion

54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
26654581638  100%  Spring Integration

37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
102734493141  100%  Stream Bridge

9k/s

Answered By – Gary Russell

Answer Checked By – Dawn Plyler (BugsFixing Volunteer)

Leave a Reply

Your email address will not be published. Required fields are marked *