引入rabbitmq/应用解偶/流量削峰,引入redisson实现分布式锁

This commit is contained in:
xiongxiaoyang 2020-06-02 22:32:33 +08:00
parent 8885730e77
commit 1bda806862
11 changed files with 264 additions and 31 deletions

View File

@ -41,9 +41,6 @@
</dependency>
</dependencies>

View File

@ -13,6 +13,7 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
@ -34,6 +35,8 @@ public class BookController {
private final BookService bookService;
private final RabbitTemplate rabbitTemplate;
/**
* 小说分类列表查询接口
*/
@ -67,7 +70,7 @@ public class BookController {
@ApiOperation("点击量新增接口")
@PostMapping("addVisitCount")
public ResultBean addVisitCount(@ApiParam("小说ID") @RequestParam("bookId") Long bookId) {
bookService.addVisitCount(bookId, 1);
rabbitTemplate.convertAndSend("ADD-BOOK-VISIT-EXCHANGE", null, bookId);
return ResultBean.ok();
}

View File

@ -0,0 +1,70 @@
package com.java2nb.novel.book.listener;
import com.java2nb.novel.book.service.BookService;
import com.java2nb.novel.common.cache.CacheKey;
import com.java2nb.novel.common.cache.CacheService;
import com.java2nb.novel.common.utils.Constants;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 11797
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class BookVisitAddListener {
private final BookService bookService;
private final CacheService cacheService;
private final RedissonClient redissonClient;
/**
* 更新数据库
* 流量削峰每本小说累积10个点击更新一次
*/
@SneakyThrows
@RabbitListener(queues = {"UPDATE-DB-QUEUE"})
public void updateDb(Long bookId, Channel channel, Message message) {
log.debug("收到更新数据库消息:" + bookId);
RLock lock = redissonClient.getLock("addVisitCountToDb");
lock.lock();
try {
Integer visitCount = (Integer) cacheService.getObject(CacheKey.BOOK_ADD_VISIT_COUNT + bookId);
if (visitCount == null) {
visitCount = 0;
}
cacheService.setObject(CacheKey.BOOK_ADD_VISIT_COUNT + bookId, ++visitCount);
if (visitCount >= Constants.ADD_MAX_VISIT_COUNT) {
bookService.addVisitCount(bookId, visitCount);
cacheService.del(CacheKey.BOOK_ADD_VISIT_COUNT + bookId);
}
}catch (Exception e){
log.error("更新数据库失败"+bookId);
}
lock.unlock();
Thread.sleep(1000 * 2);
}
}

View File

@ -3,3 +3,15 @@ spring:
name: book-service
profiles:
active: dev
cloud:
nacos:
config:
extconfig[0]:
dataid: novel-redis.yml
group: novel-common
refresh: true
extconfig[1]:
dataid: novel-rabbitmq.yml
group: novel-common
refresh: true

View File

@ -48,6 +48,12 @@
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${redisson.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
@ -105,6 +111,11 @@
<artifactId>feign-httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

View File

@ -0,0 +1,62 @@
package com.java2nb.novel.common.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 11797
*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host", matchIfMissing = false)
public class RabbitConfig {
/**
* 更新数据库队列
*/
@Bean
public Queue updateDbQueue() {
return new Queue("UPDATE-DB-QUEUE", true);
}
/**
* 更新数据库队列
*/
@Bean
public Queue updateEsQueue() {
return new Queue("UPDATE-ES-QUEUE", true);
}
/**
* 增加点击量交换机
*/
@Bean
public FanoutExchange addVisitExchange() {
return new FanoutExchange("ADD-BOOK-VISIT-EXCHANGE");
}
/**
* 更新搜索引擎队列绑定到增加点击量交换机中
*/
@Bean
public Binding updateEsBinding() {
return BindingBuilder.bind(updateEsQueue()).to(addVisitExchange());
}
/**
* 更新数据库绑定到增加点击量交换机中
*/
@Bean
public Binding updateDbBinding() {
return BindingBuilder.bind(updateDbQueue()).to(addVisitExchange());
}
}

View File

@ -19,6 +19,12 @@ feign:
httpclient:
enabled: true
#关掉mq的健康检查防止某些没有用到mq的服务启动报错个别服务如需mq监控单独开启
management:
health:
rabbit:
enabled: false

View File

@ -0,0 +1,67 @@
package com.java2nb.novel.search.listener;
import com.java2nb.novel.book.entity.Book;
import com.java2nb.novel.common.cache.CacheKey;
import com.java2nb.novel.common.cache.CacheService;
import com.java2nb.novel.search.feign.BookFeignClient;
import com.java2nb.novel.search.service.SearchService;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 11797
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class BookVisitAddListener {
private final CacheService cacheService;
private final SearchService searchService;
private final RedissonClient redissonClient;
private final BookFeignClient bookFeignClient;
/**
* 更新搜索引擎
* 流量削峰每本小说1个小时更新一次
*/
@RabbitListener(queues = {"UPDATE-ES-QUEUE"})
public void updateEs(Long bookId, Channel channel, Message message) {
log.debug("收到更新搜索引擎消息:" + bookId);
RLock lock = redissonClient.getLock("addVisitCountToEs");
lock.lock();
if (cacheService.get(CacheKey.ES_IS_UPDATE_VISIT + bookId) == null) {
cacheService.set(CacheKey.ES_IS_UPDATE_VISIT + bookId, "1", 60 * 60);
try {
Thread.sleep(1000 * 5);
Book book = bookFeignClient.queryBookById(bookId);
searchService.importToEs(book);
}catch (Exception e){
cacheService.del(CacheKey.ES_IS_UPDATE_VISIT + bookId);
log.error("更新搜索引擎失败"+bookId);
}
}
lock.unlock();
}
}

View File

@ -7,6 +7,8 @@ import com.java2nb.novel.search.feign.BookFeignClient;
import com.java2nb.novel.search.service.SearchService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@ -16,6 +18,7 @@ import java.util.List;
/**
* 小说数据导入搜索引擎定时任务
*
* @author xiongxiaoyang
* @version 1.0
* @since 2020/5/27
@ -30,19 +33,19 @@ public class BookToEsSchedule {
private final CacheService cacheService;
private final SearchService searchService;
private final RedissonClient redissonClient;
/**
* 1分钟导入一次
*/
@Scheduled(fixedRate = 1000 * 60)
public void saveToEs() {
//TODO 引入Redisson框架实现分布式锁
//可以重复更新只是效率可能略有降低所以暂不实现分布式锁
if (cacheService.get(CacheKey.ES_TRANS_LOCK) == null) {
cacheService.set(CacheKey.ES_TRANS_LOCK, "1", 60 * 20);
RLock lock = redissonClient.getLock("saveToEs");
lock.lock();
try {
//查询需要更新的小说
Date lastDate = (Date) cacheService.getObject(CacheKey.ES_LAST_UPDATE_TIME);
@ -60,19 +63,17 @@ public class BookToEsSchedule {
}
cacheService.setObject(CacheKey.ES_LAST_UPDATE_TIME, lastDate);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
cacheService.del(CacheKey.ES_TRANS_LOCK);
lock.unlock();
}
}
}

View File

@ -10,3 +10,7 @@ spring:
dataid: novel-redis.yml
group: novel-common
refresh: true
extconfig[1]:
dataid: novel-rabbitmq.yml
group: novel-common
refresh: true