feat: 集成分布式任务调度 XXL-JOB, 优化 Elasticsearch 数据同步任务

This commit is contained in:
xiongxiaoyang 2022-05-31 14:03:43 +08:00
parent 4d71aa33b1
commit 220068cd3a
6 changed files with 115 additions and 37 deletions

View File

@ -19,6 +19,7 @@
<spring.version>6.0.0-SNAPSHOT</spring.version> <spring.version>6.0.0-SNAPSHOT</spring.version>
<jjwt.version>0.11.5</jjwt.version> <jjwt.version>0.11.5</jjwt.version>
<elasticsearch.version>8.2.0</elasticsearch.version> <elasticsearch.version>8.2.0</elasticsearch.version>
<xxl-job.version>2.3.1</xxl-job.version>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
@ -116,6 +117,12 @@
<artifactId>spring-boot-starter-amqp</artifactId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>

View File

@ -0,0 +1,44 @@
package io.github.xxyopen.novel.core.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* XXL-JOB 配置类
*
* @author xiongxiaoyang
* @date 2022/5/31
*/
@Configuration
@Slf4j
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setLogPath(logPath);
return xxlJobSpringExecutor;
}
}

View File

@ -21,7 +21,7 @@ import org.springframework.stereotype.Component;
* @date 2022/5/25 * @date 2022/5/25
*/ */
@Component @Component
@ConditionalOnProperty(prefix = "spring.elasticsearch", name = "enable", havingValue = "true") @ConditionalOnProperty(prefix = "spring", name = {"elasticsearch.enable","amqp.enable"}, havingValue = "true")
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
public class RabbitQueueListener { public class RabbitQueueListener {

View File

@ -6,6 +6,8 @@ import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import io.github.xxyopen.novel.core.constant.DatabaseConsts; import io.github.xxyopen.novel.core.constant.DatabaseConsts;
import io.github.xxyopen.novel.core.constant.EsConsts; import io.github.xxyopen.novel.core.constant.EsConsts;
import io.github.xxyopen.novel.dao.entity.BookInfo; import io.github.xxyopen.novel.dao.entity.BookInfo;
@ -15,7 +17,6 @@ import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
@ -40,49 +41,54 @@ public class BookToEsTask {
* 每月凌晨做一次全量数据同步 * 每月凌晨做一次全量数据同步
*/ */
@SneakyThrows @SneakyThrows
@Scheduled(cron = "0 0 0 1 * ?") @XxlJob("saveToEsJobHandler")
public void saveToEs() { public ReturnT<String> saveToEs() {
QueryWrapper<BookInfo> queryWrapper = new QueryWrapper<>(); try {
List<BookInfo> bookInfos; QueryWrapper<BookInfo> queryWrapper = new QueryWrapper<>();
long maxId = 0; List<BookInfo> bookInfos;
for(;;) { long maxId = 0;
queryWrapper.clear(); for (; ; ) {
queryWrapper queryWrapper.clear();
.orderByAsc(DatabaseConsts.CommonColumnEnum.ID.getName()) queryWrapper
.gt(DatabaseConsts.CommonColumnEnum.ID.getName(), maxId) .orderByAsc(DatabaseConsts.CommonColumnEnum.ID.getName())
.last(DatabaseConsts.SqlEnum.LIMIT_30.getSql()); .gt(DatabaseConsts.CommonColumnEnum.ID.getName(), maxId)
bookInfos = bookInfoMapper.selectList(queryWrapper); .gt(DatabaseConsts.BookTable.COLUMN_WORD_COUNT, 0)
if (bookInfos.isEmpty()) { .last(DatabaseConsts.SqlEnum.LIMIT_30.getSql());
break; bookInfos = bookInfoMapper.selectList(queryWrapper);
} if (bookInfos.isEmpty()) {
BulkRequest.Builder br = new BulkRequest.Builder(); break;
}
BulkRequest.Builder br = new BulkRequest.Builder();
for (BookInfo book : bookInfos) { for (BookInfo book : bookInfos) {
br.operations(op -> op br.operations(op -> op
.index(idx -> idx .index(idx -> idx
.index(EsConsts.BookIndex.INDEX_NAME) .index(EsConsts.BookIndex.INDEX_NAME)
.id(book.getId().toString()) .id(book.getId().toString())
.document(EsBookDto.build(book)) .document(EsBookDto.build(book))
) )
).timeout(Time.of(t -> t.time("10s"))); ).timeout(Time.of(t -> t.time("10s")));
maxId = book.getId(); maxId = book.getId();
} }
BulkResponse result = elasticsearchClient.bulk(br.build()); BulkResponse result = elasticsearchClient.bulk(br.build());
// Log errors, if any // Log errors, if any
if (result.errors()) { if (result.errors()) {
log.error("Bulk had errors"); log.error("Bulk had errors");
for (BulkResponseItem item : result.items()) { for (BulkResponseItem item : result.items()) {
if (item.error() != null) { if (item.error() != null) {
log.error(item.error().reason()); log.error(item.error().reason());
}
} }
} }
} }
return ReturnT.SUCCESS;
} catch (Exception e) {
log.error(e.getMessage(), e);
return ReturnT.FAIL;
} }
} }
} }

View File

@ -109,6 +109,12 @@ public class EsSearchServiceImpl implements SearchService {
BoolQuery boolQuery = BoolQuery.of(b -> { BoolQuery boolQuery = BoolQuery.of(b -> {
// 只查有字数的小说
b.must(RangeQuery.of(m -> m
.field(EsConsts.BookIndex.FIELD_WORD_COUNT)
.gt(JsonData.of(0))
)._toQuery());
if (!StringUtils.isBlank(condition.getKeyword())) { if (!StringUtils.isBlank(condition.getKeyword())) {
// 关键词匹配 // 关键词匹配
b.must((q -> q.multiMatch(t -> t b.must((q -> q.multiMatch(t -> t

View File

@ -58,6 +58,21 @@ spring:
# 第一次和第二次重试之间的持续时间 # 第一次和第二次重试之间的持续时间
initial-interval: "3s" initial-interval: "3s"
# XXL-JOB 配置
xxl:
job:
admin:
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: xxl-job-executor-novel
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: logs/xxl-job/jobhandler
### xxl-job, access token
accessToken: 123
--- ---
spring: spring:
config: config: