001/*
002 * Copyright 2010-2015 Institut Pasteur.
003 * 
004 * This file is part of Icy.
005 * 
006 * Icy is free software: you can redistribute it and/or modify
007 * it under the terms of the GNU General Public License as published by
008 * the Free Software Foundation, either version 3 of the License, or
009 * (at your option) any later version.
010 * 
011 * Icy is distributed in the hope that it will be useful,
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014 * GNU General Public License for more details.
015 * 
016 * You should have received a copy of the GNU General Public License
017 * along with Icy. If not, see <http://www.gnu.org/licenses/>.
018 */
019package icy.system.thread;
020
021import icy.main.Icy;
022import icy.system.IcyExceptionHandler;
023import icy.system.SystemUtil;
024
025import java.util.ArrayList;
026import java.util.EventListener;
027import java.util.List;
028import java.util.concurrent.BlockingQueue;
029import java.util.concurrent.Callable;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.FutureTask;
033import java.util.concurrent.LinkedBlockingQueue;
034import java.util.concurrent.RejectedExecutionException;
035import java.util.concurrent.RejectedExecutionHandler;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039
040/**
041 * Processor class.<br>
042 * Allow you to queue and execute tasks on a defined set of thread.
043 * 
044 * @author stephane
045 */
046public class Processor extends ThreadPoolExecutor
047{
048    public static final int DEFAULT_MAX_WAITING = 1024;
049    public static final int DEFAULT_MAX_PROCESSING = SystemUtil.getNumberOfCPUs();
050
051    /**
052     * @deprecated Useless interface
053     */
054    @Deprecated
055    public interface ProcessorEventListener extends EventListener
056    {
057        public void processDone(Processor source, Runnable runnable);
058    }
059
060    protected class ProcessorThreadFactory implements ThreadFactory
061    {
062        String name;
063
064        public ProcessorThreadFactory(String name)
065        {
066            super();
067
068            setName(name);
069        }
070
071        public String getName()
072        {
073            return name;
074        }
075
076        public void setName(String value)
077        {
078            this.name = value;
079        }
080
081        String getThreadName()
082        {
083            String result = name;
084
085            return result;
086        }
087
088        @Override
089        public Thread newThread(Runnable r)
090        {
091            final Thread result = new Thread(r, getThreadName());
092
093            result.setPriority(priority);
094
095            return result;
096        }
097    }
098
099    protected class ProcessorRejectedExecutionHandler implements RejectedExecutionHandler
100    {
101        @Override
102        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
103        {
104            // ignore if we try to submit process while Icy is exiting
105            if (!Icy.isExiting())
106                throw new RejectedExecutionException("Cannot add new task, ignore execution of " + r);
107        }
108    }
109
110    protected class FutureTaskAdapter<T> extends FutureTask<T>
111    {
112        public Runnable runnable;
113        public Callable<T> callable;
114        final boolean handleException;
115
116        public FutureTaskAdapter(Runnable runnable, T result, boolean handleException)
117        {
118            super(runnable, result);
119
120            this.runnable = runnable;
121            this.callable = null;
122            this.handleException = handleException;
123        }
124
125        public FutureTaskAdapter(Runnable runnable, boolean handleException)
126        {
127            this(runnable, null, handleException);
128        }
129
130        public FutureTaskAdapter(Callable<T> callable, boolean handleException)
131        {
132            super(callable);
133
134            this.runnable = null;
135            this.callable = callable;
136            this.handleException = handleException;
137        }
138
139        @Override
140        protected void done()
141        {
142            super.done();
143
144            if (handleException)
145            {
146                try
147                {
148                    get();
149                }
150                catch (Exception e)
151                {
152                    IcyExceptionHandler.handleException(e.getCause(), true);
153                }
154            }
155        }
156    }
157
158    /**
159     * @deprecated
160     */
161    @Deprecated
162    protected class RunnableAdapter implements Runnable
163    {
164        private final Runnable task;
165        private final boolean onEDT;
166
167        public RunnableAdapter(Runnable runnable, boolean onEDT)
168        {
169            super();
170
171            task = runnable;
172            this.onEDT = onEDT;
173        }
174
175        @Override
176        public void run()
177        {
178            if (task != null)
179            {
180                if (onEDT)
181                    ThreadUtil.invokeNow(task);
182                else
183                    task.run();
184            }
185        }
186
187        /**
188         * @return the task
189         */
190        public Runnable getTask()
191        {
192            return task;
193        }
194    }
195
196    /**
197     * @deprecated
198     */
199    @Deprecated
200    protected class CallableAdapter<T> implements Callable<T>
201    {
202        private final Callable<T> task;
203        private final boolean onEDT;
204
205        public CallableAdapter(Callable<T> task, boolean onEDT)
206        {
207            super();
208
209            this.task = task;
210            this.onEDT = onEDT;
211        }
212
213        /**
214         * @return the task
215         */
216        public Callable<T> getTask()
217        {
218            return task;
219        }
220
221        @Override
222        public T call() throws Exception
223        {
224            if (task != null)
225            {
226                if (onEDT)
227                    return ThreadUtil.invokeNow(task);
228
229                return task.call();
230            }
231
232            return null;
233        }
234    }
235
236    /**
237     * @deprecated
238     */
239    @Deprecated
240    protected class FutureTaskAdapterEDT<T> extends FutureTaskAdapter<T>
241    {
242        public FutureTaskAdapterEDT(Runnable runnable, T result, boolean onEDT)
243        {
244            super(new RunnableAdapter(runnable, onEDT), result, true);
245
246            // assign the original runnable
247            this.runnable = runnable;
248            this.callable = null;
249        }
250
251        public FutureTaskAdapterEDT(Runnable runnable, boolean onEDT)
252        {
253            this(runnable, null, onEDT);
254        }
255
256        public FutureTaskAdapterEDT(Callable<T> callable, boolean onEDT)
257        {
258            super(new CallableAdapter<T>(callable, onEDT), true);
259
260            // assign the original callable
261            this.runnable = null;
262            this.callable = callable;
263        }
264
265        public Runnable getRunnable()
266        {
267            return runnable;
268        }
269
270        public Callable<T> getCallable()
271        {
272            return callable;
273        }
274    }
275
276    /**
277     * The minimum priority that a thread can have.
278     */
279    public final static int MIN_PRIORITY = Thread.MIN_PRIORITY;
280
281    /**
282     * The default priority that is assigned to a thread.
283     */
284    public final static int NORM_PRIORITY = Thread.NORM_PRIORITY;
285
286    /**
287     * The maximum priority that a thread can have.
288     */
289    public final static int MAX_PRIORITY = Thread.MAX_PRIORITY;
290
291    /**
292     * parameters
293     */
294    int priority;
295
296    /**
297     * internal
298     */
299    protected Runnable waitingExecution;
300    protected long lastAdd;
301
302    /**
303     * Create a new Processor with specified number of maximum waiting and processing tasks.<br>
304     * 
305     * @param maxWaiting
306     *        The length of waiting queue.
307     * @param numThread
308     *        The maximum number of processing thread.
309     * @param priority
310     *        Processor priority<br>
311     *        <code>Processor.MIN_PRIORITY</code><br>
312     *        <code>Processor.NORM_PRIORITY</code><br>
313     *        <code>Processor.MAX_PRIORITY</code>
314     */
315    public Processor(int maxWaiting, int numThread, int priority)
316    {
317        super(numThread, numThread, 2L, TimeUnit.SECONDS, (maxWaiting == -1) ? new LinkedBlockingQueue<Runnable>()
318                : new LinkedBlockingQueue<Runnable>(maxWaiting));
319
320        setThreadFactory(new ProcessorThreadFactory("Processor"));
321        setRejectedExecutionHandler(new ProcessorRejectedExecutionHandler());
322        allowCoreThreadTimeOut(true);
323
324        this.priority = priority;
325
326        waitingExecution = null;
327    }
328
329    /**
330     * Create a new Processor with specified number of maximum waiting and processing tasks.
331     * 
332     * @param maxWaiting
333     *        The length of waiting queue.
334     * @param numThread
335     *        The maximum number of processing thread.
336     */
337    public Processor(int maxWaiting, int numThread)
338    {
339        this(maxWaiting, numThread, NORM_PRIORITY);
340    }
341
342    /**
343     * Create a new Processor with specified number of processing thread.
344     * 
345     * @param numThread
346     *        The maximum number of processing thread.
347     */
348    public Processor(int numThread)
349    {
350        this(-1, numThread, NORM_PRIORITY);
351    }
352
353    /**
354     * Create a new Processor with default number of maximum waiting and processing tasks.
355     */
356    public Processor()
357    {
358        this(DEFAULT_MAX_WAITING, DEFAULT_MAX_PROCESSING);
359    }
360
361    /**
362     * @deprecated Use {@link #removeFirstWaitingTask(Runnable)} instead.
363     */
364    @Deprecated
365    public boolean removeTask(Runnable task)
366    {
367        return removeFirstWaitingTask(task);
368    }
369
370    /**
371     * @deprecated Use {@link #submit(Runnable)} instead.
372     */
373    @Deprecated
374    public boolean addTask(Runnable task, boolean onEDT, int id)
375    {
376        return addTask(task, onEDT);
377    }
378
379    /**
380     * @deprecated Use {@link #submit(Runnable)} instead.
381     */
382    @Deprecated
383    public boolean addTask(Runnable task, boolean onEDT)
384    {
385        try
386        {
387            submit(task, onEDT);
388        }
389        catch (RejectedExecutionException E)
390        {
391            return false;
392        }
393
394        return true;
395    }
396
397    /**
398     * @deprecated Use {@link #submit(Runnable)} instead.
399     */
400    @Deprecated
401    public boolean addTask(Runnable task)
402    {
403        return addTask(task, false);
404    }
405
406    @Override
407    public boolean remove(Runnable task)
408    {
409        // don't forget to remove the reference here
410        if (waitingExecution == task)
411            waitingExecution = null;
412
413        return super.remove(task);
414    }
415
416    /**
417     * @deprecated Use {@link #newTaskFor(Runnable, Object)} instead.
418     */
419    @Deprecated
420    protected <T> FutureTaskAdapter<T> newTaskFor(Runnable runnable, T value, boolean onEDT)
421    {
422        return new FutureTaskAdapterEDT<T>(runnable, value, onEDT);
423    };
424
425    /**
426     * @deprecated Use {@link #newTaskFor(Callable)} instead.
427     */
428    @Deprecated
429    protected <T> FutureTaskAdapter<T> newTaskFor(Callable<T> callable, boolean onEDT)
430    {
431        return new FutureTaskAdapterEDT<T>(callable, onEDT);
432    }
433
434    /**
435     * @param handledException
436     *        if set to <code>true</code> then any occurring exception during the runnable
437     *        processing will be catch by {@link IcyExceptionHandler}.
438     * @param runnable
439     *        the runnable task being wrapped
440     * @param value
441     *        the default value for the returned future
442     * @return a <tt>RunnableFuture</tt> which when run will run the underlying runnable and which,
443     *         as a <tt>Future</tt>, will yield the given value as its result and provide for
444     *         cancellation of the underlying task.
445     */
446    protected <T> FutureTaskAdapter<T> newTaskFor(boolean handledException, Runnable runnable, T value)
447    {
448        return new FutureTaskAdapter<T>(runnable, value, handledException);
449    };
450
451    /**
452     * @param handledException
453     *        if set to <code>true</code> then any occurring exception during the runnable
454     *        processing will be catch by {@link IcyExceptionHandler}.
455     * @param callable
456     *        the callable task being wrapped
457     * @return a <tt>RunnableFuture</tt> which when run will call the
458     *         underlying callable and which, as a <tt>Future</tt>, will yield
459     *         the callable's result as its result and provide for
460     *         cancellation of the underlying task.
461     */
462    protected <T> FutureTaskAdapter<T> newTaskFor(boolean handledException, Callable<T> callable)
463    {
464        return new FutureTaskAdapter<T>(callable, handledException);
465    }
466
467    @Override
468    public void execute(Runnable task)
469    {
470        super.execute(task);
471        // save the last executed task
472        waitingExecution = task;
473    }
474
475    /**
476     * Submit the given task (internal use only).
477     */
478    protected synchronized <T> FutureTask<T> submit(FutureTaskAdapter<T> task)
479    {
480        execute(task);
481        return task;
482    }
483
484    @Override
485    public Future<?> submit(Runnable task)
486    {
487        if (task == null)
488            throw new NullPointerException();
489
490        return submit(newTaskFor(false, task, null));
491    }
492
493    @Override
494    public <T> Future<T> submit(Runnable task, T result)
495    {
496        if (task == null)
497            throw new NullPointerException();
498
499        return submit(newTaskFor(false, task, result));
500    }
501
502    @Override
503    public <T> Future<T> submit(Callable<T> task)
504    {
505        if (task == null)
506            throw new NullPointerException();
507
508        return submit(newTaskFor(false, task));
509    }
510
511    /**
512     * Submits a Runnable task for execution and returns a Future
513     * representing that task. The Future's <tt>get</tt> method will
514     * return <tt>null</tt> upon <em>successful</em> completion.
515     * 
516     * @param handleException
517     *        if set to <code>true</code> then any occurring exception during the runnable
518     *        processing will be catch by {@link IcyExceptionHandler}.
519     * @param task
520     *        the task to submit
521     * @return a Future representing pending completion of the task
522     * @throws RejectedExecutionException
523     *         if the task cannot be
524     *         scheduled for execution
525     * @throws NullPointerException
526     *         if the task is null
527     */
528    public Future<?> submit(boolean handleException, Runnable task)
529    {
530        if (task == null)
531            throw new NullPointerException();
532
533        return submit(newTaskFor(handleException, task, null));
534    }
535
536    /**
537     * Submits a Runnable task for execution and returns a Future
538     * representing that task. The Future's <tt>get</tt> method will
539     * return the given result upon successful completion.
540     * 
541     * @param handleException
542     *        if set to <code>true</code> then any occurring exception during the runnable
543     *        processing will be catch by {@link IcyExceptionHandler}.
544     * @param task
545     *        the task to submit
546     * @param result
547     *        the result to return
548     * @return a Future representing pending completion of the task
549     * @throws RejectedExecutionException
550     *         if the task cannot be
551     *         scheduled for execution
552     * @throws NullPointerException
553     *         if the task is null
554     */
555    public <T> Future<T> submit(boolean handleException, Runnable task, T result)
556    {
557        if (task == null)
558            throw new NullPointerException();
559
560        return submit(newTaskFor(handleException, task, result));
561    }
562
563    /**
564     * Submits a value-returning task for execution and returns a
565     * Future representing the pending results of the task. The
566     * Future's <tt>get</tt> method will return the task's result upon
567     * successful completion.
568     * <p>
569     * If you would like to immediately block waiting for a task, you can use constructions of the form
570     * <tt>result = exec.submit(aCallable).get();</tt>
571     * <p>
572     * Note: The {@link Executors} class includes a set of methods that can convert some other common closure-like
573     * objects, for example, {@link java.security.PrivilegedAction} to {@link Callable} form so they can be submitted.
574     * 
575     * @param handleException
576     *        if set to <code>true</code> then any occurring exception during the runnable
577     *        processing will be catch by {@link IcyExceptionHandler}.
578     * @param task
579     *        the task to submit
580     * @return a Future representing pending completion of the task
581     * @throws RejectedExecutionException
582     *         if the task cannot be
583     *         scheduled for execution
584     * @throws NullPointerException
585     *         if the task is null
586     */
587    public <T> Future<T> submit(boolean handleException, Callable<T> task)
588    {
589        if (task == null)
590            throw new NullPointerException();
591
592        return submit(newTaskFor(handleException, task));
593    }
594
595    /**
596     * @deprecated Use {@link #submit(Runnable)} instead and ThreadUtil.invokeNow(..) where you need
597     *             it.
598     */
599    @Deprecated
600    public Future<?> submit(Runnable task, boolean onEDT)
601    {
602        if (task == null)
603            throw new NullPointerException();
604
605        return submit(newTaskFor(task, null, onEDT));
606    }
607
608    /**
609     * @deprecated Use {@link #submit(Runnable, Object)} instead and ThreadUtil.invokeNow(..) where
610     *             you
611     *             need it.
612     */
613    @Deprecated
614    public <T> Future<T> submit(Runnable task, T result, boolean onEDT)
615    {
616        if (task == null)
617            throw new NullPointerException();
618
619        return submit(newTaskFor(task, result, onEDT));
620    }
621
622    /**
623     * @deprecated Use {@link #submit(Callable)} instead and ThreadUtil.invokeNow(..) where you need
624     *             it.
625     */
626    @Deprecated
627    public <T> Future<T> submit(Callable<T> task, boolean onEDT)
628    {
629        if (task == null)
630            throw new NullPointerException();
631
632        return submit(newTaskFor(task, onEDT));
633    }
634
635    /**
636     * Return true if one or more process are executing or we still have waiting tasks.
637     */
638    public boolean isProcessing()
639    {
640        return (getActiveCount() > 0) || hasWaitingTasks();
641    }
642
643    /**
644     * Wait for all tasks completion
645     */
646    public void waitAll()
647    {
648        while (isProcessing())
649            ThreadUtil.sleep(1);
650    }
651
652    /**
653     * shutdown and wait current tasks completion
654     */
655    public void shutdownAndWait()
656    {
657        shutdown();
658        while (!isTerminated())
659            ThreadUtil.sleep(1);
660    }
661
662    /**
663     * @return the priority
664     */
665    public int getPriority()
666    {
667        return priority;
668    }
669
670    /**
671     * @param priority
672     *        the priority to set
673     */
674    public void setPriority(int priority)
675    {
676        this.priority = priority;
677    }
678
679    /**
680     * @deprecated Use {@link #getThreadName()} instead
681     */
682    @Deprecated
683    public String getDefaultThreadName()
684    {
685        return ((ProcessorThreadFactory) getThreadFactory()).getName();
686    }
687
688    /**
689     * @deprecated Use {@link #setThreadName(String)} instead
690     */
691    @Deprecated
692    public void setDefaultThreadName(String defaultThreadName)
693    {
694        ((ProcessorThreadFactory) getThreadFactory()).setName(defaultThreadName);
695    }
696
697    /**
698     * Return the thread name.
699     */
700    public String getThreadName()
701    {
702        return ((ProcessorThreadFactory) getThreadFactory()).getName();
703    }
704
705    /**
706     * Set the wanted thread name.
707     */
708    public void setThreadName(String defaultThreadName)
709    {
710        ((ProcessorThreadFactory) getThreadFactory()).setName(defaultThreadName);
711    }
712
713    /**
714     * Get the number of free slot in queue
715     */
716    public int getFreeSlotNumber()
717    {
718        return getQueue().remainingCapacity();
719    }
720
721    /**
722     * Return true if queue is full
723     */
724    public boolean isFull()
725    {
726        return getFreeSlotNumber() == 0;
727    }
728
729    /**
730     * Return waiting tasks
731     */
732    protected List<FutureTaskAdapter<?>> getWaitingTasks()
733    {
734        final BlockingQueue<Runnable> q = getQueue();
735        final List<FutureTaskAdapter<?>> result = new ArrayList<FutureTaskAdapter<?>>();
736
737        synchronized (q)
738        {
739            for (Runnable r : q)
740                result.add((FutureTaskAdapter<?>) r);
741        }
742
743        return result;
744    }
745
746    /**
747     * Return waiting tasks for the specified Runnable instance
748     */
749    protected List<FutureTaskAdapter<?>> getWaitingTasks(Runnable task)
750    {
751        final List<FutureTaskAdapter<?>> result = new ArrayList<FutureTaskAdapter<?>>();
752
753        // scan all tasks
754        for (FutureTaskAdapter<?> f : getWaitingTasks())
755            if (f.runnable == task)
756                result.add(f);
757
758        return result;
759    }
760
761    /**
762     * Return waiting tasks for the specified Callable instance
763     */
764    protected List<FutureTaskAdapter<?>> getWaitingTasks(Callable<?> task)
765    {
766        final List<FutureTaskAdapter<?>> result = new ArrayList<FutureTaskAdapter<?>>();
767
768        // scan all tasks
769        for (FutureTaskAdapter<?> f : getWaitingTasks())
770            if (f.callable == task)
771                result.add(f);
772
773        return result;
774    }
775
776    /**
777     * Return the number of waiting task
778     */
779    public int getWaitingTasksCount()
780    {
781        final int result = getQueue().size();
782
783        // TODO : be sure that waitingExecution pass to null when task has been taken in account.
784        // Queue can be empty right after a task submission.
785        // For this particular case we return 1 if a task has been submitted
786        // and not taken in account with a timeout of 1 second.
787        if ((result == 0) && ((waitingExecution != null) && ((System.currentTimeMillis() - lastAdd) < 1000)))
788            return 1;
789
790        return result;
791    }
792
793    /**
794     * @deprecated Not anymore supported.<br>
795     *             Use {@link #getWaitingTasksCount(Callable)} or {@link #getWaitingTasksCount(Runnable)} instead.
796     */
797    @Deprecated
798    public int getWaitingTasksCount(int id)
799    {
800        return 0;
801    }
802
803    /**
804     * Return the number of task waiting in queue for the specified <tt>Runnable</tt> instance.
805     */
806    public int getWaitingTasksCount(Runnable task)
807    {
808        int result = 0;
809
810        for (FutureTaskAdapter<?> f : getWaitingTasks())
811            if (f.runnable == task)
812                result++;
813
814        return result;
815    }
816
817    /**
818     * Return the number of task waiting in queue for the specified <tt>Callable</tt> instance.
819     */
820    public int getWaitingTasksCount(Callable<?> task)
821    {
822        int result = 0;
823
824        for (FutureTaskAdapter<?> f : getWaitingTasks())
825            if (f.callable == task)
826                result++;
827
828        return result;
829    }
830
831    /**
832     * Return true if we have at least one task waiting in queue
833     */
834    public boolean hasWaitingTasks()
835    {
836        return (getWaitingTasksCount() > 0);
837    }
838
839    /**
840     * @deprecated Not anymore supported.<br>
841     *             Use {@link #hasWaitingTasks(Callable)} or {@link #hasWaitingTasks(Runnable)} instead.
842     */
843    @Deprecated
844    public boolean hasWaitingTasks(int id)
845    {
846        return false;
847    }
848
849    /**
850     * Return true if we have at least one task in queue for the specified <tt>Runnable</tt> instance.
851     */
852    public boolean hasWaitingTasks(Runnable task)
853    {
854        // scan all tasks
855        for (FutureTaskAdapter<?> f : getWaitingTasks())
856            if (f.runnable == task)
857                return true;
858
859        return false;
860    }
861
862    /**
863     * Return true if we have at least one task in queue for the specified <tt>Callable</tt> instance.
864     */
865    public boolean hasWaitingTasks(Callable<?> task)
866    {
867        // scan all tasks
868        for (FutureTaskAdapter<?> f : getWaitingTasks())
869            if (f.callable == task)
870                return true;
871
872        return false;
873    }
874
875    /**
876     * @deprecated Not anymore supported.<br>
877     *             USe {@link #removeFirstWaitingTask(Runnable)} or {@link #removeFirstWaitingTask(Callable)} instead.
878     */
879    @Deprecated
880    public boolean removeFirstWaitingTask(int id)
881    {
882        return false;
883    }
884
885    /**
886     * Remove first waiting task for the specified <tt>FutureTaskAdapter</tt> instance.
887     */
888    protected boolean removeFirstWaitingTask(FutureTaskAdapter<?> task)
889    {
890        if (task == null)
891            return false;
892
893        synchronized (getQueue())
894        {
895            // remove first task of specified instance
896            for (FutureTaskAdapter<?> f : getWaitingTasks())
897                if (f == task)
898                    return remove(f);
899        }
900
901        return false;
902    }
903
904    /**
905     * Remove first waiting task for the specified <tt>Runnable</tt> instance.
906     */
907    public boolean removeFirstWaitingTask(Runnable task)
908    {
909        if (task == null)
910            return false;
911
912        synchronized (getQueue())
913        {
914            // remove first task of specified instance
915            for (FutureTaskAdapter<?> f : getWaitingTasks())
916                if (f.runnable == task)
917                    return remove(f);
918        }
919
920        return false;
921    }
922
923    /**
924     * Remove first waiting task for the specified <tt>Callable</tt> instance.
925     */
926    public boolean removeFirstWaitingTask(Callable<?> task)
927    {
928        if (task == null)
929            return false;
930
931        synchronized (getQueue())
932        {
933            // remove first task of specified instance
934            for (FutureTaskAdapter<?> f : getWaitingTasks())
935                if (f.callable == task)
936                    return remove(f);
937        }
938
939        return false;
940    }
941
942    /**
943     * @deprecated Not anymore supported.<br>
944     *             USe {@link #removeWaitingTasks(Runnable)} or {@link #removeWaitingTasks(Callable)} instead.
945     */
946    @Deprecated
947    public boolean removeWaitingTasks(int id)
948    {
949        return false;
950    }
951
952    /**
953     * Remove all waiting tasks for the specified <tt>Runnable</tt> instance.
954     */
955    public boolean removeWaitingTasks(Runnable task)
956    {
957        boolean result = false;
958
959        synchronized (getQueue())
960        {
961            // remove all tasks of specified instance
962            for (FutureTaskAdapter<?> f : getWaitingTasks(task))
963                result |= remove(f);
964        }
965
966        return result;
967    }
968
969    /**
970     * Remove all waiting tasks for the specified <tt>Callable</tt> instance.
971     */
972    public boolean removeWaitingTasks(Callable<?> task)
973    {
974        boolean result = false;
975
976        synchronized (getQueue())
977        {
978            // remove all tasks of specified instance
979            for (FutureTaskAdapter<?> f : getWaitingTasks(task))
980                result |= remove(f);
981        }
982
983        return result;
984    }
985
986    /**
987     * Clear all waiting tasks
988     */
989    public void removeAllWaitingTasks()
990    {
991        waitingExecution = null;
992
993        final BlockingQueue<Runnable> q = getQueue();
994
995        synchronized (q)
996        {
997            // remove all tasks
998            q.clear();
999        }
1000    }
1001
1002    /**
1003     * @deprecated This method is useless.
1004     */
1005    @Deprecated
1006    public void limitWaitingTask(Runnable task, int value)
1007    {
1008        synchronized (getQueue())
1009        {
1010            final List<FutureTaskAdapter<?>> tasks = getWaitingTasks(task);
1011            final int numToRemove = tasks.size() - value;
1012
1013            for (int i = 0; i < numToRemove; i++)
1014                remove(tasks.get(i));
1015        }
1016    }
1017
1018    /**
1019     * @deprecated Not anymore supported !
1020     */
1021    @Deprecated
1022    public void limitWaitingTask(int id, int value)
1023    {
1024        // not anymore supported
1025    }
1026
1027    /**
1028     * @deprecated This method is useless.
1029     */
1030    @Deprecated
1031    public boolean limitWaitingTask(int value)
1032    {
1033        synchronized (getQueue())
1034        {
1035            final List<FutureTaskAdapter<?>> tasks = getWaitingTasks();
1036            final int numToRemove = tasks.size() - value;
1037
1038            for (int i = 0; i < numToRemove; i++)
1039                remove(tasks.get(i));
1040        }
1041
1042        return false;
1043    }
1044
1045    /**
1046     * @deprecated Useless...
1047     */
1048    @Deprecated
1049    public void addListener(ProcessorEventListener listener)
1050    {
1051        //
1052    }
1053
1054    /**
1055     * @deprecated Useless...
1056     */
1057    @Deprecated
1058    public void removeListener(ProcessorEventListener listener)
1059    {
1060        //
1061    }
1062
1063    /**
1064     * @deprecated useless
1065     */
1066    @Deprecated
1067    public void fireDoneEvent(FutureTaskAdapter<?> task)
1068    {
1069        //
1070    }
1071
1072    @Override
1073    protected void beforeExecute(Thread t, Runnable r)
1074    {
1075        super.beforeExecute(t, r);
1076
1077        // ok we can remove reference...
1078        waitingExecution = null;
1079    }
1080}