/* * Copyright 2010-2013 Institut Pasteur. * * This file is part of Icy. * * Icy is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Icy is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Icy. If not, see <http://www.gnu.org/licenses/>. */ package icy.system.thread; import icy.main.Icy; import icy.system.SystemUtil; import java.util.ArrayList; import java.util.EventListener; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.swing.event.EventListenerList; /** * @author stephane */ public class Processor extends ThreadPoolExecutor { public static final int DEFAULT_MAX_WAITING = 1024; public static final int DEFAULT_MAX_PROCESSING = SystemUtil.getAvailableProcessors() * 2; public interface ProcessorEventListener extends EventListener { public void processDone(Processor source, Runnable runnable); } class ProcessorThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { final Thread result = new Thread(r, defaultThreadName); result.setPriority(priority); return result; } } protected class RunnableAdapter implements Runnable { private final Runnable task; private final boolean onEDT; public RunnableAdapter(Runnable runnable, boolean onEDT) { super(); task = runnable; this.onEDT = onEDT; } @Override public void run() { if (task != null) { if (onEDT) ThreadUtil.invokeNow(task); else task.run(); } } /** * @return the task */ public Runnable getTask() { return task; } } protected class CallableAdapter<T> implements Callable<T> { private final Callable<T> task; private final boolean onEDT; public CallableAdapter(Callable<T> task, boolean onEDT) { super(); this.task = task; this.onEDT = onEDT; } /** * @return the task */ public Callable<T> getTask() { return task; } @Override public T call() throws Exception { if (task != null) { if (onEDT) return ThreadUtil.invokeNow(task); return task.call(); } return null; } } protected class FutureTaskAdapter<T> extends FutureTask<T> { private final Runnable runnable; private final Callable<T> callable; public FutureTaskAdapter(Runnable runnable, T result, boolean onEDT) { super(new RunnableAdapter(runnable, onEDT), result); this.runnable = runnable; this.callable = null; } public FutureTaskAdapter(Runnable runnable, boolean onEDT) { this(runnable, null, onEDT); } public FutureTaskAdapter(Callable<T> callable, boolean onEDT) { super(new CallableAdapter<T>(callable, onEDT)); this.runnable = null; this.callable = callable; } public Runnable getRunnable() { return runnable; } public Callable<T> getCallable() { return callable; } } /** * The minimum priority that a thread can have. */ public final static int MIN_PRIORITY = Thread.MIN_PRIORITY; /** * The default priority that is assigned to a thread. */ public final static int NORM_PRIORITY = Thread.NORM_PRIORITY; /** * The maximum priority that a thread can have. */ public final static int MAX_PRIORITY = Thread.MAX_PRIORITY; /** * parameters */ int priority; String defaultThreadName; /** * listeners */ private final EventListenerList listeners; /** * internal */ protected Runnable waitingExecution; protected long lastAdd; /** * Create a new Processor with specified number of maximum waiting and processing tasks.<br> * * @param priority * Processor priority<br> * <code>Processor.MIN_PRIORITY</code><br> * <code>Processor.NORM_PRIORITY</code><br> * <code>Processor.MAX_PRIORITY</code> */ public Processor(int maxWaiting, int maxProcessing, int priority) { super(maxProcessing, maxProcessing, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(maxWaiting)); setThreadFactory(new ProcessorThreadFactory()); allowCoreThreadTimeOut(true); setKeepAliveTime(2, TimeUnit.SECONDS); this.priority = priority; defaultThreadName = "Processor"; listeners = new EventListenerList(); waitingExecution = null; } /** * Create a new Processor with specified number of maximum waiting and processing tasks. */ public Processor(int maxWaiting, int maxProcessing) { this(maxWaiting, maxProcessing, NORM_PRIORITY); } /** * Create a new Processor with default number of maximum waiting and processing tasks. */ public Processor() { this(DEFAULT_MAX_WAITING, DEFAULT_MAX_PROCESSING); } /** * @deprecated Use {@link #removeFirstWaitingTask(Runnable)} instead. */ @Deprecated public boolean removeTask(Runnable task) { return removeFirstWaitingTask(task); } /** * @deprecated Use {@link #submit(Runnable, boolean)} instead. */ @SuppressWarnings("unused") @Deprecated public boolean addTask(Runnable task, boolean onEDT, int id) { return addTask(task, onEDT); } /** * @deprecated Use {@link #submit(Runnable, boolean)} instead. */ @Deprecated public boolean addTask(Runnable task, boolean onEDT) { try { submit(task, onEDT); } catch (RejectedExecutionException E) { return false; } return true; } /** * @deprecated Use {@link #submit(Runnable)} instead. */ @Deprecated public boolean addTask(Runnable task) { return addTask(task, false); } @Override public boolean remove(Runnable task) { // don't forget to remove the reference here if (waitingExecution == task) waitingExecution = null; return super.remove(task); } /** * Returns a <tt>RunnableFuture</tt> for the given runnable and default * value. * * @param runnable * the runnable task being wrapped * @param value * the default value for the returned future * @param onEDT * if set to <code>true</code> then the <tt>RunnableFuture</tt> will be executed on the * Swing EDT. * @return a <tt>RunnableFuture</tt> which when run will run the underlying runnable and which, * as a <tt>Future</tt>, will yield the given value as its result and provide for * cancellation of the underlying task. */ protected <T> FutureTaskAdapter<T> newTaskFor(Runnable runnable, T value, boolean onEDT) { return new FutureTaskAdapter<T>(runnable, value, onEDT); }; /** * Returns a <tt>RunnableFuture</tt> for the given callable task. * * @param callable * the callable task being wrapped * @param onEDT * if set to <code>true</code> then the <tt>RunnableFuture</tt> will be executed on the * Swing EDT. * @return a <tt>RunnableFuture</tt> which when run will call the * underlying callable and which, as a <tt>Future</tt>, will yield * the callable's result as its result and provide for * cancellation of the underlying task. * @since 1.6 */ protected <T> FutureTaskAdapter<T> newTaskFor(Callable<T> callable, boolean onEDT) { return new FutureTaskAdapter<T>(callable, onEDT); } /** * Submits a task for execution and returns a Future representing that task. The * Future's <tt>get</tt> method will return the given result upon successful completion. * * @param task * the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException * if the task cannot be scheduled for execution * @throws NullPointerException * if the task is null */ protected synchronized <T> Future<T> submit(FutureTaskAdapter<T> task) { if (task == null) throw new NullPointerException(); try { execute(task); } catch (RejectedExecutionException e) { if (!Icy.isExiting()) { // error while adding task System.err.println("Cannot add new task, ignore execution : " + task); // TODO: may be better to throw the RejectedExecutionException exception return null; } } waitingExecution = task; return task; } /** * Submits a Runnable task for execution and returns a Future representing that task. The * Future's <tt>get</tt> method will return <tt>null</tt> upon <em>successful</em> completion. * * @param task * the task to submit * @param onEDT * if set to <code>true</code> then the <tt>RunnableFuture</tt> will be executed on the * Swing EDT. * @return a Future representing pending completion of the task * @throws RejectedExecutionException * if the task cannot be scheduled for execution * @throws NullPointerException * if the task is null */ public Future<?> submit(Runnable task, boolean onEDT) { if (task == null) throw new NullPointerException(); return submit(newTaskFor(task, null, onEDT)); } /** * Submits a Runnable task for execution and returns a Future representing that task. The * Future's <tt>get</tt> method will return the given result upon successful completion. * * @param task * the task to submit * @param result * the result to return * @param onEDT * if set to <code>true</code> then the <tt>RunnableFuture</tt> will be executed on the * Swing EDT. * @return a Future representing pending completion of the task * @throws RejectedExecutionException * if the task cannot be scheduled for execution * @throws NullPointerException * if the task is null */ public <T> Future<T> submit(Runnable task, T result, boolean onEDT) { if (task == null) throw new NullPointerException(); return submit(newTaskFor(task, result, onEDT)); } /** * Submits a value-returning task for execution and returns a Future representing the pending * results of the task. The Future's <tt>get</tt> method will return the task's result upon * successful completion. * <p> * If you would like to immediately block waiting for a task, you can use constructions of the * form <tt>result = exec.submit(aCallable).get();</tt> * <p> * Note: The {@link Executors} class includes a set of methods that can convert some other * common closure-like objects, for example, {@link java.security.PrivilegedAction} to * {@link Callable} form so they can be submitted. * * @param task * the task to submit * @param onEDT * if set to <code>true</code> then the <tt>RunnableFuture</tt> will be executed on the * Swing EDT. * @return a Future representing pending completion of the task * @throws RejectedExecutionException * if the task cannot be scheduled for execution * @throws NullPointerException * if the task is null */ public <T> Future<T> submit(Callable<T> task, boolean onEDT) { if (task == null) throw new NullPointerException(); return submit(newTaskFor(task, onEDT)); } @Override public Future<?> submit(Runnable task) { return submit(task, false); } @Override public <T> Future<T> submit(Runnable task, T result) { return submit(task, result, false); } @Override public <T> Future<T> submit(Callable<T> task) { return submit(task, false); } /** * Return true if one or more process are executing or we still have waiting tasks. */ public boolean isProcessing() { return (getActiveCount() > 0) || hasWaitingTasks(); } /** * Wait for all tasks completion */ public void waitAll() { while (isProcessing()) ThreadUtil.sleep(1); } /** * shutdown and wait current tasks completion */ public void shutdownAndWait() { shutdown(); while (!isTerminated()) ThreadUtil.sleep(1); } /** * @return the priority */ public int getPriority() { return priority; } /** * @param priority * the priority to set */ public void setPriority(int priority) { this.priority = priority; } public String getDefaultThreadName() { return defaultThreadName; } public void setDefaultThreadName(String defaultThreadName) { this.defaultThreadName = defaultThreadName; } /** * Get the number of free slot in queue */ public int getFreeSlotNumber() { return getQueue().remainingCapacity(); } /** * Return true if queue is full */ public boolean isFull() { return getFreeSlotNumber() == 0; } /** * Return waiting tasks */ protected List<FutureTaskAdapter<?>> getWaitingTasks() { final BlockingQueue<Runnable> q = getQueue(); final List<FutureTaskAdapter<?>> result = new ArrayList<Processor.FutureTaskAdapter<?>>(); synchronized (q) { for (Runnable r : q) if (r instanceof FutureTaskAdapter<?>) result.add((FutureTaskAdapter<?>) r); } return result; } /** * Return waiting tasks for the specified Runnable instance */ protected List<FutureTaskAdapter<?>> getWaitingTasks(Runnable task) { final List<FutureTaskAdapter<?>> result = new ArrayList<Processor.FutureTaskAdapter<?>>(); // scan all tasks for (FutureTaskAdapter<?> f : getWaitingTasks()) if (f.getRunnable() == task) result.add(f); return result; } /** * Return waiting tasks for the specified Callable instance */ protected List<FutureTaskAdapter<?>> getWaitingTasks(Callable<?> task) { final List<FutureTaskAdapter<?>> result = new ArrayList<Processor.FutureTaskAdapter<?>>(); // scan all tasks for (FutureTaskAdapter<?> f : getWaitingTasks()) if (f.getCallable() == task) result.add(f); return result; } /** * Return the number of waiting task */ public int getWaitingTasksCount() { final int result = getQueue().size(); // TODO : be sure that waitingExecution pass to null when task has been taken in account. // Queue can be empty right after a task submission. // For this particular case we return 1 if a task has been submitted // and not taken in account with a timeout of 1 second. if ((result == 0) && ((waitingExecution != null) && ((System.currentTimeMillis() - lastAdd) < 1000))) return 1; return result; } /** * @deprecated Not anymore supported.<br> * Use {@link #getWaitingTasksCount(Callable)} or * {@link #getWaitingTasksCount(Runnable)} instead. */ @SuppressWarnings("unused") @Deprecated public int getWaitingTasksCount(int id) { return 0; } /** * Return the number of task waiting in queue for the specified <tt>Runnable</tt> instance. */ public int getWaitingTasksCount(Runnable task) { int result = 0; for (FutureTaskAdapter<?> f : getWaitingTasks()) if (f.getRunnable() == task) result++; return result; } /** * Return the number of task waiting in queue for the specified <tt>Callable</tt> instance. */ public int getWaitingTasksCount(Callable<?> task) { int result = 0; for (FutureTaskAdapter<?> f : getWaitingTasks()) if (f.getCallable() == task) result++; return result; } /** * Return true if we have at least one task waiting in queue */ public boolean hasWaitingTasks() { return (getWaitingTasksCount() > 0); } /** * @deprecated Not anymore supported.<br> * Use {@link #hasWaitingTasks(Callable)} or {@link #hasWaitingTasks(Runnable)} * instead. */ @SuppressWarnings("unused") @Deprecated public boolean hasWaitingTasks(int id) { return false; } /** * Return true if we have at least one task in queue for the specified <tt>Runnable</tt> * instance. */ public boolean hasWaitingTasks(Runnable task) { // scan all tasks for (FutureTaskAdapter<?> f : getWaitingTasks()) if (f.getRunnable() == task) return true; return false; } /** * Return true if we have at least one task in queue for the specified <tt>Callable</tt> * instance. */ public boolean hasWaitingTasks(Callable<?> task) { // scan all tasks for (FutureTaskAdapter<?> f : getWaitingTasks()) if (f.getCallable() == task) return true; return false; } /** * @deprecated Not anymore supported.<br> * USe {@link #removeFirstWaitingTask(Runnable)} or * {@link #removeFirstWaitingTask(Callable)} instead. */ @SuppressWarnings("unused") @Deprecated public boolean removeFirstWaitingTask(int id) { return false; } /** * Remove first waiting task for the specified <tt>Runnable</tt> instance. */ public boolean removeFirstWaitingTask(FutureTaskAdapter<?> task) { if (task == null) return false; if (task.getCallable() != null) return removeFirstWaitingTask(task.getCallable()); if (task.getRunnable() != null) return removeFirstWaitingTask(task.getRunnable()); return false; } /** * Remove first waiting task for the specified <tt>Runnable</tt> instance. */ public boolean removeFirstWaitingTask(Runnable task) { synchronized (getQueue()) { // remove first task of specified instance for (FutureTaskAdapter<?> f : getWaitingTasks()) if (f.getRunnable() == task) return remove(f); } return false; } /** * Remove first waiting task for the specified <tt>Callable</tt> instance. */ public boolean removeFirstWaitingTask(Callable<?> task) { synchronized (getQueue()) { // remove first task of specified instance for (FutureTaskAdapter<?> f : getWaitingTasks()) if (f.getCallable() == task) return remove(f); } return false; } /** * @deprecated Not anymore supported.<br> * USe {@link #removeWaitingTasks(Runnable)} or * {@link #removeWaitingTasks(Callable)} instead. */ @Deprecated @SuppressWarnings("unused") public boolean removeWaitingTasks(int id) { return false; } /** * Remove all waiting tasks for the specified <tt>Runnable</tt> instance. */ public boolean removeWaitingTasks(Runnable task) { boolean result = false; synchronized (getQueue()) { // remove all tasks of specified instance for (FutureTaskAdapter<?> f : getWaitingTasks(task)) result |= remove(f); } return result; } /** * Remove all waiting tasks for the specified <tt>Callable</tt> instance. */ public boolean removeWaitingTasks(Callable<?> task) { boolean result = false; synchronized (getQueue()) { // remove all tasks of specified instance for (FutureTaskAdapter<?> f : getWaitingTasks(task)) result |= remove(f); } return result; } /** * Clear all waiting tasks */ public void removeAllWaitingTasks() { waitingExecution = null; synchronized (getQueue()) { // remove all tasks getQueue().clear(); } } /** * @deprecated This method is useless. */ @Deprecated public void limitWaitingTask(Runnable task, int value) { synchronized (getQueue()) { final List<FutureTaskAdapter<?>> tasks = getWaitingTasks(task); final int numToRemove = tasks.size() - value; for (int i = 0; i < numToRemove; i++) remove(tasks.get(i)); } } /** * @deprecated Not anymore supported ! */ @SuppressWarnings("unused") @Deprecated public void limitWaitingTask(int id, int value) { // not anymore supported } /** * @deprecated This method is useless. */ @Deprecated public boolean limitWaitingTask(int value) { synchronized (getQueue()) { final List<FutureTaskAdapter<?>> tasks = getWaitingTasks(); final int numToRemove = tasks.size() - value; for (int i = 0; i < numToRemove; i++) remove(tasks.get(i)); } return false; } /** * Add a listener * * @param listener */ public void addListener(ProcessorEventListener listener) { listeners.add(ProcessorEventListener.class, listener); } /** * Remove a listener * * @param listener */ public void removeListener(ProcessorEventListener listener) { listeners.remove(ProcessorEventListener.class, listener); } /** * fire event * * @param task */ public void fireDoneEvent(FutureTaskAdapter<?> task) { for (ProcessorEventListener listener : listeners.getListeners(ProcessorEventListener.class)) listener.processDone(this, task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); // notify we just achieved a process fireDoneEvent((FutureTaskAdapter<?>) r); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); // ok we can remove reference... waitingExecution = null; } }