将之前串行处理优化为多线程推送

master
e 6 months ago
parent e80b37a25c
commit 7dcd35ec85

@ -4,6 +4,7 @@ import com.mashibing.entity.Air;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.Date;
import java.util.List;
/**
@ -15,5 +16,15 @@ public interface AirMapper {
@Select("select * from air")
List<Air> findAll();
// 提供一个查询需要推送数据的SQL总条数
@Select("select count(1) from air where monitor_time = #{monitorTime}")
long selectCountNeedPush(@Param("monitorTime")String monitorTime);
// 查询今日第一个需要推送的数据,目的是获取ID方便计算
@Select("select id from air where monitor_time = #{monitorTime} order by id limit 1")
Long selectOneNeedPush(@Param("monitorTime")String monitorTime);
// 查询需要推送的数据的内容基于ID范围去查询。
@Select("select * from air where monitor_time = #{monitorTime} and id between #{fromId} and #{toId} order by id")
List<Air> selectByIdAndLimit(@Param("monitorTime")String monitorTime,@Param("fromId") long fromId,@Param("toId") long toId);
}

@ -1,11 +1,13 @@
package com.mashibing.service;
import java.util.concurrent.ExecutionException;
/**
* @author zjw
* @description
*/
public interface PushService {
void pushData();
void pushData(String date) throws ExecutionException, InterruptedException;
}

@ -11,6 +11,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author zjw
@ -20,16 +21,72 @@ import java.util.List;
@Slf4j
public class PushServiceImpl implements PushService {
private final int COUNT = 10000;
private final int COUNT_ID = 9999;
private final ExecutorService executor = Executors.newFixedThreadPool(50);
@Autowired
private PushUtil pushUtil;
@Resource
private AirMapper airMapper;
/**
* 线
*/
@Override
public void pushData(String date) throws ExecutionException, InterruptedException {
// 将数据计算做好。
//1.1 获取到今天需要推送数据的总数
long totalCount = airMapper.selectCountNeedPush(date);
System.out.println(totalCount);
//1.2 查询今日需要推送的数据的第一个id
Long firstId = airMapper.selectOneNeedPush(date);
//1.3 计算投递任务的个数
long times = totalCount % COUNT == 0 ? totalCount / COUNT : totalCount / COUNT + 1;
List<Future<String>> futureList = new ArrayList<>();
//1.4 开始计算每个线程推送的数据
for (int i = 0; i < times; i++) {
// 计算好每次每个线程需要查询的数据范围
long fromId = firstId;
long toId = firstId + COUNT_ID;
firstId = firstId + COUNT;
//2.1 基于线程池投递数据
Future<String> future = executor.submit(() -> {
//2.2 查询当前线程需要推送的数据
List<Air> airList = airMapper.selectByIdAndLimit(date, fromId, toId);
//2.3 推送数据
for (Air air : airList) {
boolean success = pushUtil.push(air);
if (!success) {
// 3.1 代表推送失败,需要落库做好数据的留存,后面该重试
}
}
log.info("【推送数据】 当前线程推送完毕数据。");
return "OK";
});
futureList.add(future);
}
for (Future future : futureList) {
future.get();
}
// 3.2 差选推送失败的数据库表的中信息,可以再次尝试推送(这里推送成功的,可以直接删除,失败的继续留存。)
CopyOnWriteArrayList<Air> airs = pushUtil.get();
System.out.println(airs.size());
}
/*@Override
public void pushData() {
long start = System.currentTimeMillis();
// 推送失败数据
List<Air> errorData = new ArrayList<>();
// 查询需要推送数据。
List<Air> airList = airMapper.findAll();
log.info("【推送数据】 推送air数据条数 = {}",airList.size());
for (Air air : airList) {
@ -41,5 +98,5 @@ public class PushServiceImpl implements PushService {
}
long end = System.currentTimeMillis();
log.info("【推送数据】 推送air数据条数 = {},花费时间 = {}",airList.size() - errorData.size(),(end - start) / 1000);
}
}*/
}

@ -3,6 +3,8 @@ package com.mashibing.util;
import com.mashibing.entity.Air;
import org.springframework.stereotype.Component;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author zjw
* @description
@ -10,14 +12,15 @@ import org.springframework.stereotype.Component;
@Component
public class PushUtil {
CopyOnWriteArrayList<Air> list = new CopyOnWriteArrayList();
public boolean push(Air air){
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
list.add(air);
return true;
}
public CopyOnWriteArrayList<Air> get(){
return list;
}
}

@ -0,0 +1,43 @@
package com.mashibing.mapper;
import com.mashibing.entity.Air;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.List;
import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@SpringBootTest
public class AirMapperTest {
@Resource
private AirMapper airMapper;
@Test
public void findAll() {
}
@Test
public void selectCountNeedPush() {
long count = airMapper.selectCountNeedPush("2024-05-21");
System.out.println(count);
}
@Test
public void selectOneNeedPush() {
Long id = airMapper.selectOneNeedPush("2024-05-21");
System.out.println(id);
}
@Test
public void selectByIdAndLimit() {
List<Air> airList = airMapper.selectByIdAndLimit("2024-05-21", 67, 10066);
System.out.println(airList.size());
}
}

@ -6,6 +6,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@ -16,7 +18,7 @@ public class PushServiceTest {
private PushService pushService;
@Test
public void pushData() {
pushService.pushData();
public void pushData() throws ExecutionException, InterruptedException {
pushService.pushData("2024-05-21");
}
}
Loading…
Cancel
Save