多线程完成全量数据的更新,并测试

master
e 5 months ago
parent bd663e0433
commit 52cd528661

@ -1,10 +1,16 @@
package com.mashibing.cache;
import com.mashibing.entity.Air;
import com.mashibing.service.AirService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
import org.springframework.stereotype.Repository;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -17,7 +23,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class AirCache implements InitializingBean {
// 一个只有一个工作线程,并且可以执行的定时任务的线程池。
private static final ScheduledExecutorService AIR_POOL = Executors.newSingleThreadScheduledExecutor();
private static final ScheduledExecutorService AIR_TASK_POOL = Executors.newSingleThreadScheduledExecutor();
// 每个线程查询的数据条数
private static final long AIR_COUNT = 5000;
// 摸鱼的方式构建一个线程
private static final ExecutorService AIR_FIND_POOL = Executors.newFixedThreadPool(10);
// JVM缓存 写读
private static final ConcurrentHashMap<Long,Air> AIR_CACHE = new ConcurrentHashMap<Long, Air>(128);
@ -27,15 +39,53 @@ public class AirCache implements InitializingBean {
private static final ReentrantReadWriteLock.WriteLock WRITE_LOCK = READ_WRITE_LOCK.writeLock();
private static final ReentrantReadWriteLock.ReadLock READ_LOCK = READ_WRITE_LOCK.readLock();
// 查询数据库数据
@Autowired
private AirService airService;
@Override
public void afterPropertiesSet() throws Exception {
// scheduleAtFixedRate 每次间隔3s但是任务的执行时间也会算到这3s内。
// scheduleWithFixedDelay 每次间隔3s等待上个任务完成后间隔3s再执行下次任务。
AIR_POOL.scheduleWithFixedDelay(() -> {
AIR_TASK_POOL.scheduleWithFixedDelay(() -> {
// 这里做全量更新
// 1、查询数据库中的全部数据 TODO 后面全量查询数据库
Map<Long,Air> allData = new HashMap<>();
// 2、同步到JVM缓存中。
// 1、查询数据库中的全部数据
// 查询数据的总条数
long count = airService.findCount();
long taskCount = (count % AIR_COUNT) == 0 ? count / AIR_COUNT : count / AIR_COUNT + 1;
List<Future<List<Air>>> taskList = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
long offset = i * AIR_COUNT;
Future<List<Air>> airListFuture = AIR_FIND_POOL.submit(() -> {
List<Air> list = airService.findLimit(offset, AIR_COUNT);
return list;
});
taskList.add(airListFuture);
}
// 根据数据条数做好每个线程查询的数据
// 每个线程都查询完毕后将数据整合到map中。
ConcurrentHashMap<Long,Air> allData = new ConcurrentHashMap<>();
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
for (Future future : taskList) {
try {
// 这里获取线程池任务的结果,结果到手后,才会停止阻塞。
List<Air> airList = (List<Air>) future.get();
AIR_FIND_POOL.execute(() -> {
for (Air air : airList) {
allData.put(air.getId(),air);
}
countDownLatch.countDown();
});
} catch (Exception e) {
e.printStackTrace();
}
}
// 2、等到前面的put操作的线程全部ok这里再同步到JVM缓存中。
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
writeAll(allData);
},0,3, TimeUnit.SECONDS);
}

@ -4,6 +4,8 @@ import com.mashibing.entity.Air;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
* @author zjw
* @description
@ -12,4 +14,10 @@ public interface AirMapper {
@Select("select * from air where id = #{id}")
Air findById(@Param("id") Long id);
@Select("select count(1) from air")
long findCount();
@Select("select * from air limit #{offset},#{size}")
List<Air> findByLimit(@Param("offset") long offset, @Param("size") long size);
}

@ -2,10 +2,16 @@ package com.mashibing.service;
import com.mashibing.entity.Air;
import java.util.List;
/**
* @author zjw
* @description
*/
public interface AirService {
Air getById(Long id);
long findCount();
List<Air> findLimit(long offset, long count);
}

@ -5,11 +5,13 @@ import com.mashibing.entity.Air;
import com.mashibing.mapper.AirMapper;
import com.mashibing.service.AirService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* @author zjw
@ -25,6 +27,7 @@ public class AirServiceImpl implements AirService {
private RedisTemplate redisTemplate;
@Autowired
@Lazy
private AirCache airCache;
private final String AIR_PREFIX = "air:";
@ -45,4 +48,14 @@ public class AirServiceImpl implements AirService {
}
return air;
}
@Override
public long findCount() {
return airMapper.findCount();
}
@Override
public List<Air> findLimit(long offset, long size) {
return airMapper.findByLimit(offset, size);
}
}

Loading…
Cancel
Save