1
This commit is contained in:
5
pom.xml
5
pom.xml
@@ -55,6 +55,11 @@
|
|||||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||||
<version>2.2.0</version>
|
<version>2.2.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.guava</groupId>
|
||||||
|
<artifactId>guava</artifactId>
|
||||||
|
<version>33.3.1-jre</version>
|
||||||
|
</dependency>
|
||||||
<!-- Spring Boot Starter Test,测试依赖 -->
|
<!-- Spring Boot Starter Test,测试依赖 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import cn.hutool.core.util.ObjectUtil;
|
|||||||
import cn.hutool.http.HttpRequest;
|
import cn.hutool.http.HttpRequest;
|
||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -27,10 +28,13 @@ public class MessageConsumerService implements RocketMQListener<JSONObject> {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);
|
||||||
|
|
||||||
|
// create a rate limiter of 1 qps
|
||||||
|
RateLimiter rateLimiter = RateLimiter.create(1.0);
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(JSONObject jsonObject) {
|
public void onMessage(JSONObject jsonObject) {
|
||||||
// 处理消息
|
// 处理消息
|
||||||
try {
|
try {
|
||||||
|
rateLimiter.acquire(); // 请求许可。如果超过速率,则此方法会阻塞
|
||||||
logger.info("消费到消息:{}", jsonObject);
|
logger.info("消费到消息:{}", jsonObject);
|
||||||
String body = jsonObject.getString("body");
|
String body = jsonObject.getString("body");
|
||||||
byte[] decodedBody = Base64.getDecoder().decode(body);
|
byte[] decodedBody = Base64.getDecoder().decode(body);
|
||||||
|
|||||||
Reference in New Issue
Block a user