001/* 002 * Copyright 2010-2015 Institut Pasteur. 003 * 004 * This file is part of Icy. 005 * 006 * Icy is free software: you can redistribute it and/or modify 007 * it under the terms of the GNU General Public License as published by 008 * the Free Software Foundation, either version 3 of the License, or 009 * (at your option) any later version. 010 * 011 * Icy is distributed in the hope that it will be useful, 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 014 * GNU General Public License for more details. 015 * 016 * You should have received a copy of the GNU General Public License 017 * along with Icy. If not, see <http://www.gnu.org/licenses/>. 018 */ 019package icy.system.thread; 020 021import icy.main.Icy; 022import icy.system.IcyExceptionHandler; 023import icy.system.SystemUtil; 024 025import java.util.ArrayList; 026import java.util.EventListener; 027import java.util.List; 028import java.util.concurrent.BlockingQueue; 029import java.util.concurrent.Callable; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.FutureTask; 033import java.util.concurrent.LinkedBlockingQueue; 034import java.util.concurrent.RejectedExecutionException; 035import java.util.concurrent.RejectedExecutionHandler; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039 040/** 041 * Processor class.<br> 042 * Allow you to queue and execute tasks on a defined set of thread. 043 * 044 * @author stephane 045 */ 046public class Processor extends ThreadPoolExecutor 047{ 048 public static final int DEFAULT_MAX_WAITING = 1024; 049 public static final int DEFAULT_MAX_PROCESSING = SystemUtil.getNumberOfCPUs(); 050 051 /** 052 * @deprecated Useless interface 053 */ 054 @Deprecated 055 public interface ProcessorEventListener extends EventListener 056 { 057 public void processDone(Processor source, Runnable runnable); 058 } 059 060 protected class ProcessorThreadFactory implements ThreadFactory 061 { 062 String name; 063 064 public ProcessorThreadFactory(String name) 065 { 066 super(); 067 068 setName(name); 069 } 070 071 public String getName() 072 { 073 return name; 074 } 075 076 public void setName(String value) 077 { 078 this.name = value; 079 } 080 081 String getThreadName() 082 { 083 String result = name; 084 085 return result; 086 } 087 088 @Override 089 public Thread newThread(Runnable r) 090 { 091 final Thread result = new Thread(r, getThreadName()); 092 093 result.setPriority(priority); 094 095 return result; 096 } 097 } 098 099 protected class ProcessorRejectedExecutionHandler implements RejectedExecutionHandler 100 { 101 @Override 102 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 103 { 104 // ignore if we try to submit process while Icy is exiting 105 if (!Icy.isExiting()) 106 throw new RejectedExecutionException("Cannot add new task, ignore execution of " + r); 107 } 108 } 109 110 protected class FutureTaskAdapter<T> extends FutureTask<T> 111 { 112 public Runnable runnable; 113 public Callable<T> callable; 114 final boolean handleException; 115 116 public FutureTaskAdapter(Runnable runnable, T result, boolean handleException) 117 { 118 super(runnable, result); 119 120 this.runnable = runnable; 121 this.callable = null; 122 this.handleException = handleException; 123 } 124 125 public FutureTaskAdapter(Runnable runnable, boolean handleException) 126 { 127 this(runnable, null, handleException); 128 } 129 130 public FutureTaskAdapter(Callable<T> callable, boolean handleException) 131 { 132 super(callable); 133 134 this.runnable = null; 135 this.callable = callable; 136 this.handleException = handleException; 137 } 138 139 @Override 140 protected void done() 141 { 142 super.done(); 143 144 if (handleException) 145 { 146 try 147 { 148 get(); 149 } 150 catch (Exception e) 151 { 152 IcyExceptionHandler.handleException(e.getCause(), true); 153 } 154 } 155 } 156 } 157 158 /** 159 * @deprecated 160 */ 161 @Deprecated 162 protected class RunnableAdapter implements Runnable 163 { 164 private final Runnable task; 165 private final boolean onEDT; 166 167 public RunnableAdapter(Runnable runnable, boolean onEDT) 168 { 169 super(); 170 171 task = runnable; 172 this.onEDT = onEDT; 173 } 174 175 @Override 176 public void run() 177 { 178 if (task != null) 179 { 180 if (onEDT) 181 ThreadUtil.invokeNow(task); 182 else 183 task.run(); 184 } 185 } 186 187 /** 188 * @return the task 189 */ 190 public Runnable getTask() 191 { 192 return task; 193 } 194 } 195 196 /** 197 * @deprecated 198 */ 199 @Deprecated 200 protected class CallableAdapter<T> implements Callable<T> 201 { 202 private final Callable<T> task; 203 private final boolean onEDT; 204 205 public CallableAdapter(Callable<T> task, boolean onEDT) 206 { 207 super(); 208 209 this.task = task; 210 this.onEDT = onEDT; 211 } 212 213 /** 214 * @return the task 215 */ 216 public Callable<T> getTask() 217 { 218 return task; 219 } 220 221 @Override 222 public T call() throws Exception 223 { 224 if (task != null) 225 { 226 if (onEDT) 227 return ThreadUtil.invokeNow(task); 228 229 return task.call(); 230 } 231 232 return null; 233 } 234 } 235 236 /** 237 * @deprecated 238 */ 239 @Deprecated 240 protected class FutureTaskAdapterEDT<T> extends FutureTaskAdapter<T> 241 { 242 public FutureTaskAdapterEDT(Runnable runnable, T result, boolean onEDT) 243 { 244 super(new RunnableAdapter(runnable, onEDT), result, true); 245 246 // assign the original runnable 247 this.runnable = runnable; 248 this.callable = null; 249 } 250 251 public FutureTaskAdapterEDT(Runnable runnable, boolean onEDT) 252 { 253 this(runnable, null, onEDT); 254 } 255 256 public FutureTaskAdapterEDT(Callable<T> callable, boolean onEDT) 257 { 258 super(new CallableAdapter<T>(callable, onEDT), true); 259 260 // assign the original callable 261 this.runnable = null; 262 this.callable = callable; 263 } 264 265 public Runnable getRunnable() 266 { 267 return runnable; 268 } 269 270 public Callable<T> getCallable() 271 { 272 return callable; 273 } 274 } 275 276 /** 277 * The minimum priority that a thread can have. 278 */ 279 public final static int MIN_PRIORITY = Thread.MIN_PRIORITY; 280 281 /** 282 * The default priority that is assigned to a thread. 283 */ 284 public final static int NORM_PRIORITY = Thread.NORM_PRIORITY; 285 286 /** 287 * The maximum priority that a thread can have. 288 */ 289 public final static int MAX_PRIORITY = Thread.MAX_PRIORITY; 290 291 /** 292 * parameters 293 */ 294 int priority; 295 296 /** 297 * internal 298 */ 299 protected Runnable waitingExecution; 300 protected long lastAdd; 301 302 /** 303 * Create a new Processor with specified number of maximum waiting and processing tasks.<br> 304 * 305 * @param maxWaiting 306 * The length of waiting queue. 307 * @param numThread 308 * The maximum number of processing thread. 309 * @param priority 310 * Processor priority<br> 311 * <code>Processor.MIN_PRIORITY</code><br> 312 * <code>Processor.NORM_PRIORITY</code><br> 313 * <code>Processor.MAX_PRIORITY</code> 314 */ 315 public Processor(int maxWaiting, int numThread, int priority) 316 { 317 super(numThread, numThread, 2L, TimeUnit.SECONDS, (maxWaiting == -1) ? new LinkedBlockingQueue<Runnable>() 318 : new LinkedBlockingQueue<Runnable>(maxWaiting)); 319 320 setThreadFactory(new ProcessorThreadFactory("Processor")); 321 setRejectedExecutionHandler(new ProcessorRejectedExecutionHandler()); 322 allowCoreThreadTimeOut(true); 323 324 this.priority = priority; 325 326 waitingExecution = null; 327 } 328 329 /** 330 * Create a new Processor with specified number of maximum waiting and processing tasks. 331 * 332 * @param maxWaiting 333 * The length of waiting queue. 334 * @param numThread 335 * The maximum number of processing thread. 336 */ 337 public Processor(int maxWaiting, int numThread) 338 { 339 this(maxWaiting, numThread, NORM_PRIORITY); 340 } 341 342 /** 343 * Create a new Processor with specified number of processing thread. 344 * 345 * @param numThread 346 * The maximum number of processing thread. 347 */ 348 public Processor(int numThread) 349 { 350 this(-1, numThread, NORM_PRIORITY); 351 } 352 353 /** 354 * Create a new Processor with default number of maximum waiting and processing tasks. 355 */ 356 public Processor() 357 { 358 this(DEFAULT_MAX_WAITING, DEFAULT_MAX_PROCESSING); 359 } 360 361 /** 362 * @deprecated Use {@link #removeFirstWaitingTask(Runnable)} instead. 363 */ 364 @Deprecated 365 public boolean removeTask(Runnable task) 366 { 367 return removeFirstWaitingTask(task); 368 } 369 370 /** 371 * @deprecated Use {@link #submit(Runnable)} instead. 372 */ 373 @Deprecated 374 public boolean addTask(Runnable task, boolean onEDT, int id) 375 { 376 return addTask(task, onEDT); 377 } 378 379 /** 380 * @deprecated Use {@link #submit(Runnable)} instead. 381 */ 382 @Deprecated 383 public boolean addTask(Runnable task, boolean onEDT) 384 { 385 try 386 { 387 submit(task, onEDT); 388 } 389 catch (RejectedExecutionException E) 390 { 391 return false; 392 } 393 394 return true; 395 } 396 397 /** 398 * @deprecated Use {@link #submit(Runnable)} instead. 399 */ 400 @Deprecated 401 public boolean addTask(Runnable task) 402 { 403 return addTask(task, false); 404 } 405 406 @Override 407 public boolean remove(Runnable task) 408 { 409 // don't forget to remove the reference here 410 if (waitingExecution == task) 411 waitingExecution = null; 412 413 return super.remove(task); 414 } 415 416 /** 417 * @deprecated Use {@link #newTaskFor(Runnable, Object)} instead. 418 */ 419 @Deprecated 420 protected <T> FutureTaskAdapter<T> newTaskFor(Runnable runnable, T value, boolean onEDT) 421 { 422 return new FutureTaskAdapterEDT<T>(runnable, value, onEDT); 423 }; 424 425 /** 426 * @deprecated Use {@link #newTaskFor(Callable)} instead. 427 */ 428 @Deprecated 429 protected <T> FutureTaskAdapter<T> newTaskFor(Callable<T> callable, boolean onEDT) 430 { 431 return new FutureTaskAdapterEDT<T>(callable, onEDT); 432 } 433 434 /** 435 * @param handledException 436 * if set to <code>true</code> then any occurring exception during the runnable 437 * processing will be catch by {@link IcyExceptionHandler}. 438 * @param runnable 439 * the runnable task being wrapped 440 * @param value 441 * the default value for the returned future 442 * @return a <tt>RunnableFuture</tt> which when run will run the underlying runnable and which, 443 * as a <tt>Future</tt>, will yield the given value as its result and provide for 444 * cancellation of the underlying task. 445 */ 446 protected <T> FutureTaskAdapter<T> newTaskFor(boolean handledException, Runnable runnable, T value) 447 { 448 return new FutureTaskAdapter<T>(runnable, value, handledException); 449 }; 450 451 /** 452 * @param handledException 453 * if set to <code>true</code> then any occurring exception during the runnable 454 * processing will be catch by {@link IcyExceptionHandler}. 455 * @param callable 456 * the callable task being wrapped 457 * @return a <tt>RunnableFuture</tt> which when run will call the 458 * underlying callable and which, as a <tt>Future</tt>, will yield 459 * the callable's result as its result and provide for 460 * cancellation of the underlying task. 461 */ 462 protected <T> FutureTaskAdapter<T> newTaskFor(boolean handledException, Callable<T> callable) 463 { 464 return new FutureTaskAdapter<T>(callable, handledException); 465 } 466 467 @Override 468 public void execute(Runnable task) 469 { 470 super.execute(task); 471 // save the last executed task 472 waitingExecution = task; 473 } 474 475 /** 476 * Submit the given task (internal use only). 477 */ 478 protected synchronized <T> FutureTask<T> submit(FutureTaskAdapter<T> task) 479 { 480 execute(task); 481 return task; 482 } 483 484 @Override 485 public Future<?> submit(Runnable task) 486 { 487 if (task == null) 488 throw new NullPointerException(); 489 490 return submit(newTaskFor(false, task, null)); 491 } 492 493 @Override 494 public <T> Future<T> submit(Runnable task, T result) 495 { 496 if (task == null) 497 throw new NullPointerException(); 498 499 return submit(newTaskFor(false, task, result)); 500 } 501 502 @Override 503 public <T> Future<T> submit(Callable<T> task) 504 { 505 if (task == null) 506 throw new NullPointerException(); 507 508 return submit(newTaskFor(false, task)); 509 } 510 511 /** 512 * Submits a Runnable task for execution and returns a Future 513 * representing that task. The Future's <tt>get</tt> method will 514 * return <tt>null</tt> upon <em>successful</em> completion. 515 * 516 * @param handleException 517 * if set to <code>true</code> then any occurring exception during the runnable 518 * processing will be catch by {@link IcyExceptionHandler}. 519 * @param task 520 * the task to submit 521 * @return a Future representing pending completion of the task 522 * @throws RejectedExecutionException 523 * if the task cannot be 524 * scheduled for execution 525 * @throws NullPointerException 526 * if the task is null 527 */ 528 public Future<?> submit(boolean handleException, Runnable task) 529 { 530 if (task == null) 531 throw new NullPointerException(); 532 533 return submit(newTaskFor(handleException, task, null)); 534 } 535 536 /** 537 * Submits a Runnable task for execution and returns a Future 538 * representing that task. The Future's <tt>get</tt> method will 539 * return the given result upon successful completion. 540 * 541 * @param handleException 542 * if set to <code>true</code> then any occurring exception during the runnable 543 * processing will be catch by {@link IcyExceptionHandler}. 544 * @param task 545 * the task to submit 546 * @param result 547 * the result to return 548 * @return a Future representing pending completion of the task 549 * @throws RejectedExecutionException 550 * if the task cannot be 551 * scheduled for execution 552 * @throws NullPointerException 553 * if the task is null 554 */ 555 public <T> Future<T> submit(boolean handleException, Runnable task, T result) 556 { 557 if (task == null) 558 throw new NullPointerException(); 559 560 return submit(newTaskFor(handleException, task, result)); 561 } 562 563 /** 564 * Submits a value-returning task for execution and returns a 565 * Future representing the pending results of the task. The 566 * Future's <tt>get</tt> method will return the task's result upon 567 * successful completion. 568 * <p> 569 * If you would like to immediately block waiting for a task, you can use constructions of the form 570 * <tt>result = exec.submit(aCallable).get();</tt> 571 * <p> 572 * Note: The {@link Executors} class includes a set of methods that can convert some other common closure-like 573 * objects, for example, {@link java.security.PrivilegedAction} to {@link Callable} form so they can be submitted. 574 * 575 * @param handleException 576 * if set to <code>true</code> then any occurring exception during the runnable 577 * processing will be catch by {@link IcyExceptionHandler}. 578 * @param task 579 * the task to submit 580 * @return a Future representing pending completion of the task 581 * @throws RejectedExecutionException 582 * if the task cannot be 583 * scheduled for execution 584 * @throws NullPointerException 585 * if the task is null 586 */ 587 public <T> Future<T> submit(boolean handleException, Callable<T> task) 588 { 589 if (task == null) 590 throw new NullPointerException(); 591 592 return submit(newTaskFor(handleException, task)); 593 } 594 595 /** 596 * @deprecated Use {@link #submit(Runnable)} instead and ThreadUtil.invokeNow(..) where you need 597 * it. 598 */ 599 @Deprecated 600 public Future<?> submit(Runnable task, boolean onEDT) 601 { 602 if (task == null) 603 throw new NullPointerException(); 604 605 return submit(newTaskFor(task, null, onEDT)); 606 } 607 608 /** 609 * @deprecated Use {@link #submit(Runnable, Object)} instead and ThreadUtil.invokeNow(..) where 610 * you 611 * need it. 612 */ 613 @Deprecated 614 public <T> Future<T> submit(Runnable task, T result, boolean onEDT) 615 { 616 if (task == null) 617 throw new NullPointerException(); 618 619 return submit(newTaskFor(task, result, onEDT)); 620 } 621 622 /** 623 * @deprecated Use {@link #submit(Callable)} instead and ThreadUtil.invokeNow(..) where you need 624 * it. 625 */ 626 @Deprecated 627 public <T> Future<T> submit(Callable<T> task, boolean onEDT) 628 { 629 if (task == null) 630 throw new NullPointerException(); 631 632 return submit(newTaskFor(task, onEDT)); 633 } 634 635 /** 636 * Return true if one or more process are executing or we still have waiting tasks. 637 */ 638 public boolean isProcessing() 639 { 640 return (getActiveCount() > 0) || hasWaitingTasks(); 641 } 642 643 /** 644 * Wait for all tasks completion 645 */ 646 public void waitAll() 647 { 648 while (isProcessing()) 649 ThreadUtil.sleep(1); 650 } 651 652 /** 653 * shutdown and wait current tasks completion 654 */ 655 public void shutdownAndWait() 656 { 657 shutdown(); 658 while (!isTerminated()) 659 ThreadUtil.sleep(1); 660 } 661 662 /** 663 * @return the priority 664 */ 665 public int getPriority() 666 { 667 return priority; 668 } 669 670 /** 671 * @param priority 672 * the priority to set 673 */ 674 public void setPriority(int priority) 675 { 676 this.priority = priority; 677 } 678 679 /** 680 * @deprecated Use {@link #getThreadName()} instead 681 */ 682 @Deprecated 683 public String getDefaultThreadName() 684 { 685 return ((ProcessorThreadFactory) getThreadFactory()).getName(); 686 } 687 688 /** 689 * @deprecated Use {@link #setThreadName(String)} instead 690 */ 691 @Deprecated 692 public void setDefaultThreadName(String defaultThreadName) 693 { 694 ((ProcessorThreadFactory) getThreadFactory()).setName(defaultThreadName); 695 } 696 697 /** 698 * Return the thread name. 699 */ 700 public String getThreadName() 701 { 702 return ((ProcessorThreadFactory) getThreadFactory()).getName(); 703 } 704 705 /** 706 * Set the wanted thread name. 707 */ 708 public void setThreadName(String defaultThreadName) 709 { 710 ((ProcessorThreadFactory) getThreadFactory()).setName(defaultThreadName); 711 } 712 713 /** 714 * Get the number of free slot in queue 715 */ 716 public int getFreeSlotNumber() 717 { 718 return getQueue().remainingCapacity(); 719 } 720 721 /** 722 * Return true if queue is full 723 */ 724 public boolean isFull() 725 { 726 return getFreeSlotNumber() == 0; 727 } 728 729 /** 730 * Return waiting tasks 731 */ 732 protected List<FutureTaskAdapter<?>> getWaitingTasks() 733 { 734 final BlockingQueue<Runnable> q = getQueue(); 735 final List<FutureTaskAdapter<?>> result = new ArrayList<FutureTaskAdapter<?>>(); 736 737 synchronized (q) 738 { 739 for (Runnable r : q) 740 result.add((FutureTaskAdapter<?>) r); 741 } 742 743 return result; 744 } 745 746 /** 747 * Return waiting tasks for the specified Runnable instance 748 */ 749 protected List<FutureTaskAdapter<?>> getWaitingTasks(Runnable task) 750 { 751 final List<FutureTaskAdapter<?>> result = new ArrayList<FutureTaskAdapter<?>>(); 752 753 // scan all tasks 754 for (FutureTaskAdapter<?> f : getWaitingTasks()) 755 if (f.runnable == task) 756 result.add(f); 757 758 return result; 759 } 760 761 /** 762 * Return waiting tasks for the specified Callable instance 763 */ 764 protected List<FutureTaskAdapter<?>> getWaitingTasks(Callable<?> task) 765 { 766 final List<FutureTaskAdapter<?>> result = new ArrayList<FutureTaskAdapter<?>>(); 767 768 // scan all tasks 769 for (FutureTaskAdapter<?> f : getWaitingTasks()) 770 if (f.callable == task) 771 result.add(f); 772 773 return result; 774 } 775 776 /** 777 * Return the number of waiting task 778 */ 779 public int getWaitingTasksCount() 780 { 781 final int result = getQueue().size(); 782 783 // TODO : be sure that waitingExecution pass to null when task has been taken in account. 784 // Queue can be empty right after a task submission. 785 // For this particular case we return 1 if a task has been submitted 786 // and not taken in account with a timeout of 1 second. 787 if ((result == 0) && ((waitingExecution != null) && ((System.currentTimeMillis() - lastAdd) < 1000))) 788 return 1; 789 790 return result; 791 } 792 793 /** 794 * @deprecated Not anymore supported.<br> 795 * Use {@link #getWaitingTasksCount(Callable)} or {@link #getWaitingTasksCount(Runnable)} instead. 796 */ 797 @Deprecated 798 public int getWaitingTasksCount(int id) 799 { 800 return 0; 801 } 802 803 /** 804 * Return the number of task waiting in queue for the specified <tt>Runnable</tt> instance. 805 */ 806 public int getWaitingTasksCount(Runnable task) 807 { 808 int result = 0; 809 810 for (FutureTaskAdapter<?> f : getWaitingTasks()) 811 if (f.runnable == task) 812 result++; 813 814 return result; 815 } 816 817 /** 818 * Return the number of task waiting in queue for the specified <tt>Callable</tt> instance. 819 */ 820 public int getWaitingTasksCount(Callable<?> task) 821 { 822 int result = 0; 823 824 for (FutureTaskAdapter<?> f : getWaitingTasks()) 825 if (f.callable == task) 826 result++; 827 828 return result; 829 } 830 831 /** 832 * Return true if we have at least one task waiting in queue 833 */ 834 public boolean hasWaitingTasks() 835 { 836 return (getWaitingTasksCount() > 0); 837 } 838 839 /** 840 * @deprecated Not anymore supported.<br> 841 * Use {@link #hasWaitingTasks(Callable)} or {@link #hasWaitingTasks(Runnable)} instead. 842 */ 843 @Deprecated 844 public boolean hasWaitingTasks(int id) 845 { 846 return false; 847 } 848 849 /** 850 * Return true if we have at least one task in queue for the specified <tt>Runnable</tt> instance. 851 */ 852 public boolean hasWaitingTasks(Runnable task) 853 { 854 // scan all tasks 855 for (FutureTaskAdapter<?> f : getWaitingTasks()) 856 if (f.runnable == task) 857 return true; 858 859 return false; 860 } 861 862 /** 863 * Return true if we have at least one task in queue for the specified <tt>Callable</tt> instance. 864 */ 865 public boolean hasWaitingTasks(Callable<?> task) 866 { 867 // scan all tasks 868 for (FutureTaskAdapter<?> f : getWaitingTasks()) 869 if (f.callable == task) 870 return true; 871 872 return false; 873 } 874 875 /** 876 * @deprecated Not anymore supported.<br> 877 * USe {@link #removeFirstWaitingTask(Runnable)} or {@link #removeFirstWaitingTask(Callable)} instead. 878 */ 879 @Deprecated 880 public boolean removeFirstWaitingTask(int id) 881 { 882 return false; 883 } 884 885 /** 886 * Remove first waiting task for the specified <tt>FutureTaskAdapter</tt> instance. 887 */ 888 protected boolean removeFirstWaitingTask(FutureTaskAdapter<?> task) 889 { 890 if (task == null) 891 return false; 892 893 synchronized (getQueue()) 894 { 895 // remove first task of specified instance 896 for (FutureTaskAdapter<?> f : getWaitingTasks()) 897 if (f == task) 898 return remove(f); 899 } 900 901 return false; 902 } 903 904 /** 905 * Remove first waiting task for the specified <tt>Runnable</tt> instance. 906 */ 907 public boolean removeFirstWaitingTask(Runnable task) 908 { 909 if (task == null) 910 return false; 911 912 synchronized (getQueue()) 913 { 914 // remove first task of specified instance 915 for (FutureTaskAdapter<?> f : getWaitingTasks()) 916 if (f.runnable == task) 917 return remove(f); 918 } 919 920 return false; 921 } 922 923 /** 924 * Remove first waiting task for the specified <tt>Callable</tt> instance. 925 */ 926 public boolean removeFirstWaitingTask(Callable<?> task) 927 { 928 if (task == null) 929 return false; 930 931 synchronized (getQueue()) 932 { 933 // remove first task of specified instance 934 for (FutureTaskAdapter<?> f : getWaitingTasks()) 935 if (f.callable == task) 936 return remove(f); 937 } 938 939 return false; 940 } 941 942 /** 943 * @deprecated Not anymore supported.<br> 944 * USe {@link #removeWaitingTasks(Runnable)} or {@link #removeWaitingTasks(Callable)} instead. 945 */ 946 @Deprecated 947 public boolean removeWaitingTasks(int id) 948 { 949 return false; 950 } 951 952 /** 953 * Remove all waiting tasks for the specified <tt>Runnable</tt> instance. 954 */ 955 public boolean removeWaitingTasks(Runnable task) 956 { 957 boolean result = false; 958 959 synchronized (getQueue()) 960 { 961 // remove all tasks of specified instance 962 for (FutureTaskAdapter<?> f : getWaitingTasks(task)) 963 result |= remove(f); 964 } 965 966 return result; 967 } 968 969 /** 970 * Remove all waiting tasks for the specified <tt>Callable</tt> instance. 971 */ 972 public boolean removeWaitingTasks(Callable<?> task) 973 { 974 boolean result = false; 975 976 synchronized (getQueue()) 977 { 978 // remove all tasks of specified instance 979 for (FutureTaskAdapter<?> f : getWaitingTasks(task)) 980 result |= remove(f); 981 } 982 983 return result; 984 } 985 986 /** 987 * Clear all waiting tasks 988 */ 989 public void removeAllWaitingTasks() 990 { 991 waitingExecution = null; 992 993 final BlockingQueue<Runnable> q = getQueue(); 994 995 synchronized (q) 996 { 997 // remove all tasks 998 q.clear(); 999 } 1000 } 1001 1002 /** 1003 * @deprecated This method is useless. 1004 */ 1005 @Deprecated 1006 public void limitWaitingTask(Runnable task, int value) 1007 { 1008 synchronized (getQueue()) 1009 { 1010 final List<FutureTaskAdapter<?>> tasks = getWaitingTasks(task); 1011 final int numToRemove = tasks.size() - value; 1012 1013 for (int i = 0; i < numToRemove; i++) 1014 remove(tasks.get(i)); 1015 } 1016 } 1017 1018 /** 1019 * @deprecated Not anymore supported ! 1020 */ 1021 @Deprecated 1022 public void limitWaitingTask(int id, int value) 1023 { 1024 // not anymore supported 1025 } 1026 1027 /** 1028 * @deprecated This method is useless. 1029 */ 1030 @Deprecated 1031 public boolean limitWaitingTask(int value) 1032 { 1033 synchronized (getQueue()) 1034 { 1035 final List<FutureTaskAdapter<?>> tasks = getWaitingTasks(); 1036 final int numToRemove = tasks.size() - value; 1037 1038 for (int i = 0; i < numToRemove; i++) 1039 remove(tasks.get(i)); 1040 } 1041 1042 return false; 1043 } 1044 1045 /** 1046 * @deprecated Useless... 1047 */ 1048 @Deprecated 1049 public void addListener(ProcessorEventListener listener) 1050 { 1051 // 1052 } 1053 1054 /** 1055 * @deprecated Useless... 1056 */ 1057 @Deprecated 1058 public void removeListener(ProcessorEventListener listener) 1059 { 1060 // 1061 } 1062 1063 /** 1064 * @deprecated useless 1065 */ 1066 @Deprecated 1067 public void fireDoneEvent(FutureTaskAdapter<?> task) 1068 { 1069 // 1070 } 1071 1072 @Override 1073 protected void beforeExecute(Thread t, Runnable r) 1074 { 1075 super.beforeExecute(t, r); 1076 1077 // ok we can remove reference... 1078 waitingExecution = null; 1079 } 1080}