diff --git a/pom.xml b/pom.xml index 59a09fe..8d50f37 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,11 @@ rocketmq-spring-boot-starter 2.2.0 + + com.google.guava + guava + 33.3.1-jre + org.springframework.boot diff --git a/src/main/java/cn/van/business/mq/MessageConsumerService.java b/src/main/java/cn/van/business/mq/MessageConsumerService.java index 634c0c9..cb2b074 100644 --- a/src/main/java/cn/van/business/mq/MessageConsumerService.java +++ b/src/main/java/cn/van/business/mq/MessageConsumerService.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.http.HttpRequest; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson2.JSONObject; +import com.google.common.util.concurrent.RateLimiter; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; @@ -27,10 +28,13 @@ public class MessageConsumerService implements RocketMQListener { private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class); + // create a rate limiter of 1 qps + RateLimiter rateLimiter = RateLimiter.create(1.0); @Override public void onMessage(JSONObject jsonObject) { // 处理消息 try { + rateLimiter.acquire(); // 请求许可。如果超过速率,则此方法会阻塞 logger.info("消费到消息:{}", jsonObject); String body = jsonObject.getString("body"); byte[] decodedBody = Base64.getDecoder().decode(body);