Monthly Archives: 六月 2017

JAVA并发包使用-线程池

JAVA提供了4种可用的线程池方法:

1,newSingleThreadExecutor():单例线程,连接池中只会存在一个线程
2,newFixedThreadPool(int nThread):固定数量的线程池,当超过现在数量,新的线程必须等待有线程被移除
3,newCacheedThreadPool():缓存线程池,当某个线程在创建时,先查看线程池中是否有存在的线程,没有则创建一个;可以设置线程的最大执行时长,默认为60s
4,newScheduledThreadPool():计划任务线程池,可以对线程设置周期执行

好处:线程池可用让程序更加专注于执行任务本身,而不必为线程的启动和关闭耗费时间

比如,有这么一个需求:在微信公众号中输入“订单”,查询出当天不同渠道的所有订单,这其实是一个很耗时的查询,若单个任务去执行,微信服务器可能等不了多久就直接返回“连接超时”了,此时想到的就是使用连接池分发多个任务去执行查询,最后将统计结果汇总

1,newFixedThreadPool来执行固定任务:

下面新建了一个固定大小的线程池,用来执行StatTask线程

ExecutorService pool = Executors.newFixedThreadPool(6);
Future<String> task1 = pool.submit(new StatTask(ORDER_TASK1, date));
Future<String> task2 = pool.submit(new StatTask(ORDER_TASK2, date));
Future<String> task3 = pool.submit(new StatTask(ORDER_TASK3, date));
Future<String> task4 = pool.submit(new StatTask(ORDER_TASK4, date));
Future<String> task5 = pool.submit(new StatTask(ORDER_TASK5, date));
Future<String> task6 = pool.submit(new StatTask(ORDER_TASK6, date));
pool.shutdown();

try {
	resultMap.put(ORDER_TASK1, task2.get(3, TimeUnit.SECONDS));
} catch (Exception e) {
	logger.error("error", e);
	resultMap.put(ORDER_TASK1, "0");
}
....

这里调用了pool.submit方法,表示有返回值的,submit里面的参数必须实现callable接口,返回Future对象,表中任务执行结果对象;若调用pool.execute方法则没有返回值,execute里面的参数需要实现Runnable接口

使用shutdown()方法,不再接受新的任务,以前的任务可以继续执行

Future.task方法为堵塞执行,参数可设置最大的执行时长,到时间则自动终止

class StatTask implements Callable<String> {
	private String taskName;
	private String date;

	StatTask(String taskName, String date) {
		this.taskName = taskName;
		this.date = date;
	}

	@Override
	public String call() throws Exception {
		long starttime = System.currentTimeMillis();
		if (ORDER_TASK1.equals(taskName)) {
			//处理订单1
		} else if (ORDER_TASK2.equals(taskName)) {
			//处理订单2
		}
		...
		return "0";
	}

}

StatTask实现Callable接口,并重写了call方法,在里面处理响应的查询逻辑,Callable使用了泛型,在call方法中返回自定义的类型

以上就新建了6个线程用来处理统计,最后将结果放入resultMap中,每个线程的最大等待时长为3s。

2,newScheduledThreadPool执行定时任务

ScheduledExecutorService pool = Executors.newScheduledThreadPool(20);
DataImportTask task = null;
for (int i=0;i<20;i++) {
	int start = i*300;
	int end = start+300;
	task = new DataImportTask("dataImport"+i,start,end);
	pool.scheduleWithFixedDelay(task, 0, 30, TimeUnit.MINUTES);
	//pool.schedule(task, 30, TimeUnit.MINUTES)
}

上面新建了20个定时任务用来导入数据

pool.scheduleWithFixedDelay()方法,第一个参数为任务类,需要继承Runnable接口,第二个参数为第一次执行的延时(纳秒/微妙/毫秒/秒/分/时),第二个参数对于第一次任务执行完成后在推迟多少时间执行,最后一个参数为时间的单位(纳秒/微妙/毫秒/秒/分/时)

pool.schedule表示只周期执行一次,第二个参数就表示第二次推迟的时间