线程池工具类
大约 2 分钟
线程池工具类
拒绝策略
public class TheadRejectHandlerImpl implements RejectedExecutionHandler {
private static final Logger log = LoggerFactory.getLogger(TheadRejectHandlerImpl.class);
private String theadPoolName;
private boolean sendUmpAlarm;
public TheadRejectHandlerImpl(String theadPoolName,boolean sendUmpAlarm){
this.theadPoolName = theadPoolName;
this.sendUmpAlarm = sendUmpAlarm;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
String msg = String.format("线程池:%s 繁忙,改由调用方线程执行,排队任务数:%s,核心线程数:%s,最大线程数:%s,当前活跃线程数:%s,已完成任务数:%s !",
theadPoolName, executor.getQueue().size(),executor.getCorePoolSize(),
executor.getMaximumPoolSize(),executor.getActiveCount(),executor.getCompletedTaskCount());
log.warn(msg);
if (sendUmpAlarm){
log.info("###UMP sendAlarm key:{}", UmpKeyConstants.THREAD_POOL_BUSY);
Profiler.businessAlarm(UmpKeyConstants.THREAD_POOL_BUSY,msg);
}
if (!executor.isShutdown()) {
r.run();
}
}
}
工厂
public class ThreadFactoryImpl implements ThreadFactory {
/**
* 计数器
*/
private AtomicInteger counter = new AtomicInteger(0);
/**
* 线程名
*/
private String threadName;
/**
* constructor
* @param threadName
*/
public ThreadFactoryImpl(String threadName){
this.threadName = threadName;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r,threadName + "_" + this.counter.incrementAndGet());
}
}
工具类
@Slf4j
public class ThreadPoolUtil {
private static TheadPoolStatusReporter theadPoolStatusReporter;
public static class Builder{
private int corePoolSize = Runtime.getRuntime().availableProcessors()*2;
private int maximumPoolSize = Runtime.getRuntime().availableProcessors()*2;
private BlockingQueue<Runnable> workQueue;
private ThreadFactory threadFactory;
private RejectedExecutionHandler handler;
private long keepAliveTime = 60;
private TimeUnit unit = TimeUnit.SECONDS;
private final String threadPoolName;
private boolean busyAlarm;
private boolean reportPoolStatus;
public Builder(String threadPoolName){
this.threadPoolName = threadPoolName;
}
public Builder corePoolSize(Integer corePoolSize){
if(corePoolSize!=null) {
this.corePoolSize = corePoolSize;
}
return this;
}
public Builder maximumPoolSize(Integer maximumPoolSize){
if(maximumPoolSize!=null) {
this.maximumPoolSize = maximumPoolSize;
}
return this;
}
public Builder workQueue(BlockingQueue<Runnable> workQueue){
if(!CollectionUtils.isEmpty(workQueue)) {
this.workQueue = workQueue;
}
return this;
}
public Builder threadFactory(ThreadFactory threadFactory){
this.threadFactory =threadFactory;
return this;
}
public Builder handler(RejectedExecutionHandler handler){
this.handler =handler;
return this;
}
public Builder keepAliveTime(Long keepAliveTime){
if(keepAliveTime!=null) {
this.keepAliveTime = keepAliveTime;
}
return this;
}
public Builder busyAlarm(boolean busyAlarm){
this.busyAlarm =busyAlarm;
return this;
}
public Builder unit(TimeUnit unit){
this.unit =unit;
return this;
}
public Builder reportPoolStatus(boolean reportPoolStatus){
this.reportPoolStatus =reportPoolStatus;
return this;
}
public ThreadPoolExecutor build(){
if (workQueue == null){
workQueue = new LinkedBlockingQueue<>(50);
}
if (threadFactory==null){
threadFactory = new ThreadFactoryImpl(threadPoolName+"#Thead");
}
if (handler==null){
handler = new TheadRejectHandlerImpl(threadPoolName,busyAlarm);
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
if (reportPoolStatus){
if (theadPoolStatusReporter==null){
synchronized (ThreadPoolUtil.class){
if (theadPoolStatusReporter==null){
theadPoolStatusReporter = new TheadPoolStatusLogReporter();
}
}
}
theadPoolStatusReporter.add(threadPoolName,executor);
theadPoolStatusReporter.start();
}
log.info("线程池:{} 初始化完成 reportPoolStatus:{},busyAlarm:{}",threadPoolName,reportPoolStatus,busyAlarm);
return executor;
}
}
}
状态记录
public abstract class TheadPoolStatusReporter {
private Map<String,ThreadPoolExecutor> executorsMap;
private ScheduledExecutorService service;
private boolean start;
public TheadPoolStatusReporter() {
this.executorsMap = new ConcurrentHashMap<>();
this.service = Executors.newScheduledThreadPool(1);
}
public void add(String poolName,ThreadPoolExecutor executor){
executorsMap.put(poolName,executor);
}
public synchronized void start(){
if (!start){
service.scheduleAtFixedRate(this::monitor,5,60,TimeUnit.SECONDS);
start = true;
}
}
private void monitor(){
for (Map.Entry<String, ThreadPoolExecutor> entry : executorsMap.entrySet()) {
report(entry.getKey(),entry.getValue());
}
}
abstract void report(String poolName,ThreadPoolExecutor executor);
}
@Slf4j
public class TheadPoolStatusLogReporter extends TheadPoolStatusReporter {
@Override
void report(String theadPoolName,ThreadPoolExecutor executor) {
String msg = String.format("线程池:%s,排队任务数:%s,核心线程数:%s,最大线程数:%s,当前活跃线程数:%s,已完成任务数:%s !",
theadPoolName, executor.getQueue().size(),executor.getCorePoolSize(),
executor.getMaximumPoolSize(),executor.getActiveCount(),executor.getCompletedTaskCount());
log.info(msg);
}
}
