跳至主要內容

线程池工具类

程序员李某某大约 2 分钟

线程池工具类

拒绝策略

public class TheadRejectHandlerImpl implements RejectedExecutionHandler {
    private static final Logger log = LoggerFactory.getLogger(TheadRejectHandlerImpl.class);
    private String theadPoolName;
    private boolean sendUmpAlarm;
    public TheadRejectHandlerImpl(String theadPoolName,boolean sendUmpAlarm){
        this.theadPoolName = theadPoolName;
        this.sendUmpAlarm = sendUmpAlarm;
    }
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        String msg = String.format("线程池:%s 繁忙,改由调用方线程执行,排队任务数:%s,核心线程数:%s,最大线程数:%s,当前活跃线程数:%s,已完成任务数:%s !",
                theadPoolName, executor.getQueue().size(),executor.getCorePoolSize(),
                executor.getMaximumPoolSize(),executor.getActiveCount(),executor.getCompletedTaskCount());
        log.warn(msg);
        if (sendUmpAlarm){
            log.info("###UMP sendAlarm key:{}", UmpKeyConstants.THREAD_POOL_BUSY);
            Profiler.businessAlarm(UmpKeyConstants.THREAD_POOL_BUSY,msg);
        }
        if (!executor.isShutdown()) {
            r.run();
        }
    }
}

工厂

public class ThreadFactoryImpl implements ThreadFactory {
    /**
     * 计数器
     */
    private AtomicInteger counter = new AtomicInteger(0);
    /**
     * 线程名
     */
    private String threadName;

    /**
     * constructor
     * @param threadName
     */
    public ThreadFactoryImpl(String threadName){
        this.threadName = threadName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r,threadName + "_" + this.counter.incrementAndGet());
    }
}

工具类

@Slf4j
public class ThreadPoolUtil {

    private static TheadPoolStatusReporter theadPoolStatusReporter;

    public static class Builder{
        private int corePoolSize = Runtime.getRuntime().availableProcessors()*2;
        private int maximumPoolSize = Runtime.getRuntime().availableProcessors()*2;
        private BlockingQueue<Runnable> workQueue;
        private ThreadFactory threadFactory;
        private RejectedExecutionHandler handler;
        private long keepAliveTime = 60;
        private TimeUnit unit = TimeUnit.SECONDS;
        private final String threadPoolName;
        private boolean busyAlarm;
        private boolean reportPoolStatus;

        public Builder(String threadPoolName){
            this.threadPoolName = threadPoolName;
        }
        public Builder corePoolSize(Integer corePoolSize){
            if(corePoolSize!=null) {
                this.corePoolSize = corePoolSize;
            }
            return this;
        }
        public Builder maximumPoolSize(Integer maximumPoolSize){
            if(maximumPoolSize!=null) {
                this.maximumPoolSize = maximumPoolSize;
            }
            return this;
        }
        public Builder workQueue(BlockingQueue<Runnable> workQueue){
            if(!CollectionUtils.isEmpty(workQueue)) {
                this.workQueue = workQueue;
            }
            return this;
        }
        public Builder threadFactory(ThreadFactory threadFactory){
            this.threadFactory =threadFactory;
            return this;
        }
        public Builder handler(RejectedExecutionHandler handler){
            this.handler =handler;
            return this;
        }
        public Builder keepAliveTime(Long keepAliveTime){
            if(keepAliveTime!=null) {
                this.keepAliveTime = keepAliveTime;
            }
            return this;
        }
        public Builder busyAlarm(boolean busyAlarm){
            this.busyAlarm =busyAlarm;
            return this;
        }
        public Builder unit(TimeUnit unit){
            this.unit =unit;
            return this;
        }
        public Builder reportPoolStatus(boolean reportPoolStatus){
            this.reportPoolStatus =reportPoolStatus;
            return this;
        }

        public ThreadPoolExecutor build(){
            if (workQueue == null){
                workQueue = new LinkedBlockingQueue<>(50);
            }
            if (threadFactory==null){
                threadFactory = new ThreadFactoryImpl(threadPoolName+"#Thead");
            }
            if (handler==null){
                handler = new TheadRejectHandlerImpl(threadPoolName,busyAlarm);
            }
            ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
            if (reportPoolStatus){
                if (theadPoolStatusReporter==null){
                    synchronized (ThreadPoolUtil.class){
                        if (theadPoolStatusReporter==null){
                            theadPoolStatusReporter = new TheadPoolStatusLogReporter();
                        }
                    }
                }
                theadPoolStatusReporter.add(threadPoolName,executor);
                theadPoolStatusReporter.start();
            }
            log.info("线程池:{} 初始化完成 reportPoolStatus:{},busyAlarm:{}",threadPoolName,reportPoolStatus,busyAlarm);
            return executor;
        }
    }
}

状态记录

public abstract class TheadPoolStatusReporter {

    private Map<String,ThreadPoolExecutor> executorsMap;
    private ScheduledExecutorService service;
    private boolean start;

    public TheadPoolStatusReporter() {
        this.executorsMap = new ConcurrentHashMap<>();
        this.service = Executors.newScheduledThreadPool(1);
    }
    public void add(String poolName,ThreadPoolExecutor executor){
        executorsMap.put(poolName,executor);
    }

    public synchronized void start(){
        if (!start){
            service.scheduleAtFixedRate(this::monitor,5,60,TimeUnit.SECONDS);
            start = true;
        }
    }

    private void monitor(){
        for (Map.Entry<String, ThreadPoolExecutor> entry : executorsMap.entrySet()) {
            report(entry.getKey(),entry.getValue());
        }
    }

    abstract void report(String poolName,ThreadPoolExecutor executor);
}
@Slf4j
public class TheadPoolStatusLogReporter extends TheadPoolStatusReporter {
    @Override
    void report(String theadPoolName,ThreadPoolExecutor executor) {
        String msg = String.format("线程池:%s,排队任务数:%s,核心线程数:%s,最大线程数:%s,当前活跃线程数:%s,已完成任务数:%s !",
                theadPoolName, executor.getQueue().size(),executor.getCorePoolSize(),
                executor.getMaximumPoolSize(),executor.getActiveCount(),executor.getCompletedTaskCount());
        log.info(msg);
    }
}

上次编辑于:
贡献者: ext.liyuanhao3