fix(novel-crawl): 解决多个爬虫进程间的爬虫源状态冲突问题

This commit is contained in:
xiongxiaoyang 2025-05-13 10:45:38 +08:00
parent 1f53b56bd6
commit a07643bde0
2 changed files with 17 additions and 78 deletions

View File

@ -1,61 +0,0 @@
package com.java2nb.novel.core.schedule;
import com.java2nb.novel.core.cache.CacheKey;
import com.java2nb.novel.core.cache.CacheService;
import com.java2nb.novel.entity.CrawlSource;
import com.java2nb.novel.service.CrawlService;
import io.github.xxyopen.util.ThreadUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Set;
/**
* 爬虫线程监控器,监控执行完成的爬虫源并修改状态
*
* @author Administrator
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class CrawlThreadMonitor {
private final CacheService cacheService;
private final CrawlService crawlService;
@Scheduled(fixedRate = 1000 * 60 * 5)
public void monitor() {
//查询需要监控的正在运行的爬虫源
List<CrawlSource> sources = crawlService.queryCrawlSourceByStatus((byte) 1);
for (CrawlSource source : sources) {
Set<Long> runningCrawlThreadIds = (Set<Long>) cacheService.getObject(CacheKey.RUNNING_CRAWL_THREAD_KEY_PREFIX + source.getId());
boolean sourceStop = true;
if (runningCrawlThreadIds != null) {
for (Long threadId : runningCrawlThreadIds) {
Thread thread = ThreadUtil.findThread(threadId);
if (thread != null && thread.isAlive()) {
//有活跃线程说明该爬虫源正在运行数据库中状态正确不需要修改
sourceStop = false;
}
}
}
if (sourceStop) {
crawlService.updateCrawlSourceStatus(source.getId(), (byte) 0);
}
}
}
}

View File

@ -66,6 +66,8 @@ public class CrawlServiceImpl implements CrawlService {
private final CrawlHttpClient crawlHttpClient; private final CrawlHttpClient crawlHttpClient;
private final Map<Integer, Byte> crawlSourceStatusMap = new HashMap<>();
@Override @Override
public void addCrawlSource(CrawlSource source) { public void addCrawlSource(CrawlSource source) {
@ -104,6 +106,8 @@ public class CrawlServiceImpl implements CrawlService {
.build() .build()
.render(RenderingStrategies.MYBATIS3); .render(RenderingStrategies.MYBATIS3);
List<CrawlSource> crawlSources = crawlSourceMapper.selectMany(render); List<CrawlSource> crawlSources = crawlSourceMapper.selectMany(render);
crawlSources.forEach(crawlSource -> crawlSource.setSourceStatus(
Optional.ofNullable(crawlSourceStatusMap.get(crawlSource.getId())).orElse((byte) 0)));
PageBean<CrawlSource> pageBean = PageBuilder.build(crawlSources); PageBean<CrawlSource> pageBean = PageBuilder.build(crawlSources);
pageBean.setList(BeanUtil.copyList(crawlSources, CrawlSourceVO.class)); pageBean.setList(BeanUtil.copyList(crawlSources, CrawlSourceVO.class));
return pageBean; return pageBean;
@ -113,12 +117,12 @@ public class CrawlServiceImpl implements CrawlService {
@Override @Override
public void openOrCloseCrawl(Integer sourceId, Byte sourceStatus) { public void openOrCloseCrawl(Integer sourceId, Byte sourceStatus) {
//判断是开启还是关闭如果是关闭修改数据库状态后获取该爬虫正在运行的线程集合并全部停止 // 判断是开启还是关闭如果是关闭获取该爬虫源正在运行的线程集合并全部中断
//如果是开启查询数据库中状态判断该爬虫源是否还在运行如果在运行则忽略 // 如果是开启判断该爬虫源是否还在运行如果在运行则忽略如果没有运行则启动线程爬取小说数据并加入到runningCrawlThread中
// 如果没有则修改数据库状态并启动线程爬取小说数据加入到runningCrawlThread中 // 最后保存爬虫源状态
if (sourceStatus == (byte) 0) { if (sourceStatus == (byte) 0) {
//关闭,直接修改数据库状态并直接修改数据库状态后获取该爬虫正在运行的线程集合全部停止 // 关闭
SpringUtil.getBean(CrawlService.class).updateCrawlSourceStatus(sourceId, sourceStatus); // 将该爬虫源正在运行的线程集合全部停止
Set<Long> runningCrawlThreadId = (Set<Long>) cacheService.getObject( Set<Long> runningCrawlThreadId = (Set<Long>) cacheService.getObject(
CacheKey.RUNNING_CRAWL_THREAD_KEY_PREFIX + sourceId); CacheKey.RUNNING_CRAWL_THREAD_KEY_PREFIX + sourceId);
if (runningCrawlThreadId != null) { if (runningCrawlThreadId != null) {
@ -132,16 +136,13 @@ public class CrawlServiceImpl implements CrawlService {
} else { } else {
//开启 // 开启
//查询爬虫源状态和规则 Byte realSourceStatus = Optional.ofNullable(crawlSourceStatusMap.get(sourceId)).orElse((byte) 0);
CrawlSource source = queryCrawlSource(sourceId);
Byte realSourceStatus = source.getSourceStatus();
if (realSourceStatus == (byte) 0) { if (realSourceStatus == (byte) 0) {
//该爬虫源已经停止运行了,修改数据库状态并启动线程爬取小说数据加入到runningCrawlThread中 // 查询爬虫源规则
SpringUtil.getBean(CrawlService.class).updateCrawlSourceStatus(sourceId, sourceStatus); CrawlSource source = queryCrawlSource(sourceId);
//该爬虫源已经停止运行了,启动线程爬取小说数据并将线程加入到runningCrawlThread中
RuleBean ruleBean = new ObjectMapper().readValue(source.getCrawlRule(), RuleBean.class); RuleBean ruleBean = new ObjectMapper().readValue(source.getCrawlRule(), RuleBean.class);
Set<Long> threadIds = new HashSet<>(); Set<Long> threadIds = new HashSet<>();
//按分类开始爬虫解析任务 //按分类开始爬虫解析任务
for (int i = 1; i < 8; i++) { for (int i = 1; i < 8; i++) {
@ -150,16 +151,15 @@ public class CrawlServiceImpl implements CrawlService {
thread.start(); thread.start();
//thread加入到监控缓存中 //thread加入到监控缓存中
threadIds.add(thread.getId()); threadIds.add(thread.getId());
} }
cacheService.setObject(CacheKey.RUNNING_CRAWL_THREAD_KEY_PREFIX + sourceId, threadIds); cacheService.setObject(CacheKey.RUNNING_CRAWL_THREAD_KEY_PREFIX + sourceId, threadIds);
} }
} }
// 保存爬虫源状态
crawlSourceStatusMap.put(sourceId, sourceStatus);
} }
@Override @Override