6-一次性定时任务

主要内容:
1.主页前端开发(列表组件抽象)
2.批量插入数据功能开发+经验分享
3.定时任务注解
4.测试及优化批量导入功能(涉及性能优化+并发偏程知识)

开发主页

直接 list 列表组件实现

模拟 1000 万个用户, 再去查询

导入数据

导入数据的方式

  1. 用可视化界面:适合一次性导入、数据量可控
  2. 写程序:for 循环,但是尽量分批进行(可以用接口来控制)。保证可控、幂等,注意线上环境和测试环境是有区别的
  3. 执行 SQL 语句:适用于小数据量

编写一次性任务

for 循环插入数据的特点:

  1. 频繁建立和释放数据库连接(用批量查询解决)
  2. for 循环是绝对线性的(可以并发提速)

注意:并发时不要用到非并发类的集合

获取核心线程数

1
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors() - 1;

建立执行器(线程池):

1
private ExecutorService executorService = new ThreadPoolExecutor(AVAILABLE_PROCESSORS, 1000, 10000, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10000));

连接池的参数设置

1
2
CPU 密集型:分配的核心线程数 = CPU - 1
IO 密集型:分配的核心线程数可以大于 CPU 核数

建立一个测试类实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.yupi.yupao.model.domain.User;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.StopWatch;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@SpringBootTest
public class InsertUsersTest {
@Resource
UserService userService;

// CPU 密集型:分配的核心线程数 = CPU - 1
// IO 密集型:分配的核心线程数可以大于 CPU 核数
private ExecutorService executorService = new ThreadPoolExecutor(40, 1000, 10000, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10000));

/**
* 批量插入用户
*/
@Test
public void doInsertUsers() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
final int INSERT_NUM = 100000;
List<User> userList = new ArrayList<>();
for (int i = 0; i < INSERT_NUM; i++) {
User user = new User();
user.setUsername("假墨枫");
user.setUserAccount("fuckmofeng");
user.setAvatarUrl("https://636f-codenav-8grj8px727565176-1256524210.tcb.qcloud.la/img/logo.png");
user.setGender(0);
user.setUserPassword("12345678");
user.setPhone("18963421945");
user.setTags("['男','java']");
user.setEmail("123456@qq.com");
user.setUserStatus(0);
user.setUserRole(0);
user.setPlanetCode("112111");
userList.add(user);
}
// 20 秒 10 万条
userService.saveBatch(userList, 10000);
stopWatch.stop();
System.out.println(stopWatch.getTotalTimeMillis());
}

}

并发插入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// CPU 密集型:分配的核心线程数 = CPU - 1
// IO 密集型:分配的核心线程数可以大于 CPU 核数
private ExecutorService executorService = new ThreadPoolExecutor(40, 1000, 10000, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10000));
/**
* 并发插入用户
*/
@Test
public void doConcurrencyInsertUsers() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
final int INSERT_NUM = 100000;

//分十组
int j = 0;
int batchSize = 5000;
//因为CompletableFuture实现了Future接口,我们先来回顾Future吧。
//Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。
List<CompletableFuture<Void>> futureList = new ArrayList<>();
for(int i = 0; i < 10; i++){
List<User> userList = new ArrayList<>();
while(true){
j++;
User user = new User();
user.setUsername("假墨枫");
user.setUserAccount("fuckmofeng");
user.setAvatarUrl("https://636f-codenav-8grj8px727565176-1256524210.tcb.qcloud.la/img/logo.png");
user.setGender(0);
user.setUserPassword("12345678");
user.setPhone("18963421945");
user.setTags("['女','java','python','c++']");
user.setEmail("123456@qq.com");
user.setUserStatus(0);
user.setUserRole(0);
user.setPlanetCode("112111");
userList.add(user);
if(j % batchSize == 0){
break;
}
}
//新建异步任务,异步执行
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("ThreadName:" + Thread.currentThread().getName());
userService.saveBatch(userList, batchSize);
},executorService);
futureList.add(future);
}
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})).join();
stopWatch.stop();
System.out.println(stopWatch.getTotalTimeMillis());
}