Class ProcessingService
java.lang.Object
org.ojalgo.concurrent.ProcessingService
A simple wrapper around an
ExecutorService that makes it easier to process collections of items in
parallel. The work items are processed by a Consumer, Function or TwoStepMapper. In
particular the TwoStepMapper can be used to aggregate/reduce data from the work items and then
combine the collected data into a final result.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final class(package private) static final class -
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescription<W,R> Map <W, R> compute(Collection<W> work, int parallelism, Function<W, R> computer) Compute an output item for each (unique) input item, and return the results as aMap.<W,R> Map <W, R> compute(Collection<W> work, Function<W, R> computer) Using parallelismParallelism.CORES.<W,R> Map <W, R> compute(Collection<W> work, IntSupplier parallelism, Function<W, R> computer) divider()Deprecated.<W,R> Collection <R> map(Collection<W> work, int parallelism, Function<W, R> mapper) Simply map each (unique) input item to an output item - aCollectionof input results in aCollectionof output.<W,R> Collection <R> map(Collection<W> work, Function<W, R> mapper) Using parallelismParallelism.CORES.<W,R> Collection <R> map(Collection<W> work, IntSupplier parallelism, Function<W, R> mapper) static ProcessingServicenewInstance(String name) <W> voidprocess(Collection<? extends W> work, int parallelism, Consumer<W> processor) Will create at mostparallelismtasks to work through theworkitems, processing them withprocessor.<W> voidprocess(Collection<? extends W> work, Consumer<W> processor) Using parallelismParallelism.CORES.<W> voidprocess(Collection<? extends W> work, IntSupplier parallelism, Consumer<W> processor) <W> voidprocessPair(W work1, W work2, Consumer<W> processor) Just 2 work items.<W> voidprocessTriplet(W work1, W work2, W work3, Consumer<W> processor) Just 3 work items.<W,R> R reduce(Collection<W> work, int parallelism, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinsteadinvalid reference
#reduceMergeable(Collection<W>,int,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)<W,R> R reduce(Collection<W> work, IntSupplier parallelism, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinsteadinvalid reference
#reduceMergeable(Collection<W>,IntSupplier,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)<W,R> R reduce(Collection<W> work, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinsteadinvalid reference
#reduceMergeable(Collection<W>,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)<W, R, A extends TwoStepMapper.Combineable<W,R, A>>
RreduceCombineable(Collection<W> work, int parallelism, Supplier<A> reducer) Will create at mostparallelismtasks to work through theworkitems, processing them withreducer.<W, R, A extends TwoStepMapper.Combineable<W,R, A>>
RreduceCombineable(Collection<W> work, IntSupplier parallelism, Supplier<A> reducer) <W, R, A extends TwoStepMapper.Combineable<W,R, A>>
RreduceCombineable(Collection<W> work, Supplier<A> reducer) Using parallelismParallelism.CORES.<W, R, A extends TwoStepMapper.Mergeable<W,R>>
RreduceMergeable(Collection<W> work, int parallelism, Supplier<A> reducer) Will create at mostparallelismtasks to work through theworkitems, processing them withreducer.<W, R, A extends TwoStepMapper.Mergeable<W,R>>
RreduceMergeable(Collection<W> work, IntSupplier parallelism, Supplier<A> reducer) <W, R, A extends TwoStepMapper.Mergeable<W,R>>
RreduceMergeable(Collection<W> work, Supplier<A> reducer) Using parallelismParallelism.CORES.voidWill create preciselyparallelismtasks that each execute theprocessor.voidvoidrun(IntSupplier parallelism, Runnable processor) <T> AtomicBooleantake(BlockingQueue<T> queue, int parallelism, Consumer<T> processor) Will submit preciselyparallelismtasks that each take from thequeuefeeding the items to theprocessor.<T> AtomicBooleantake(BlockingQueue<T> queue, IntSupplier parallelism, Consumer<T> processor)
-
Field Details
-
INSTANCE
-
myExecutor
-
-
Constructor Details
-
ProcessingService
-
-
Method Details
-
newInstance
-
compute
Using parallelismParallelism.CORES.- See Also:
-
compute
Compute an output item for each (unique) input item, and return the results as aMap. If the input contains duplicates, the output will have fewer items. It is therefore vital that the input type implementsObject.hashCode()andObject.equals(Object)properly.Will create at most
parallelismtasks to work through theworkitems, processing them withcomputerand collectiing the results in aMap.- Type Parameters:
W- The work item typeR- The function return type- Parameters:
work- The collection of work itemsparallelism- The maximum number of concurrent workers that will process the work itemscomputer- The processing code- Returns:
- A map of function input to output
-
compute
- See Also:
-
divider
Deprecated.v56 UsenewDivider()orParallelismSupplier.newDivider(int)instead -
getExecutor
- Returns:
- The underlying
ExecutorService
-
map
Using parallelismParallelism.CORES.- See Also:
-
map
Simply map each (unique) input item to an output item - aCollectionof input results in aCollectionof output. If the input contains duplicates, the output will have fewer items. It is therefore vital that the input type implementsObject.hashCode()andObject.equals(Object)properly.- Type Parameters:
W- The input item typeR- The output item type- Parameters:
work- The collection of work itemsparallelism- The maximum number of concurrent workers that will process the work itemsmapper- The mapper functiom- Returns:
- The mapped results
-
map
- See Also:
-
newDivider
-
process
Using parallelismParallelism.CORES.- See Also:
-
process
Will create at mostparallelismtasks to work through theworkitems, processing them withprocessor.- Type Parameters:
W- The work item type- Parameters:
work- The collection of work itemsparallelism- The maximum number of concurrent workers that will process the work itemsprocessor- The processing code
-
process
public <W> void process(Collection<? extends W> work, IntSupplier parallelism, Consumer<W> processor) - See Also:
-
processPair
-
processTriplet
Just 3 work items.- See Also:
-
reduce
@Deprecated public <W,R> R reduce(Collection<W> work, int parallelism, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinsteadinvalid reference
#reduceMergeable(Collection<W>,int,Supplier<? extends TwoStepMapper.Mergeable<W, R>>) -
reduce
@Deprecated public <W,R> R reduce(Collection<W> work, IntSupplier parallelism, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinsteadinvalid reference
#reduceMergeable(Collection<W>,IntSupplier,Supplier<? extends TwoStepMapper.Mergeable<W, R>>) -
reduce
@Deprecated public <W,R> R reduce(Collection<W> work, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinsteadinvalid reference
#reduceMergeable(Collection<W>,Supplier<? extends TwoStepMapper.Mergeable<W, R>>) -
reduceCombineable
public <W, R, A extends TwoStepMapper.Combineable<W,R, R reduceCombineableA>> (Collection<W> work, int parallelism, Supplier<A> reducer) Will create at mostparallelismtasks to work through theworkitems, processing them withreducer. The state of each task'sreducerwill be combined into a single instance, and the results of that instance will be returned.Each
TwoStepMapper.Combineableis only worked on by a single thread, and the results are combined into a single instance. The instances are not reused.- Parameters:
work- The collection of work itemsparallelism- The maximum number of concurrent workers that will process the work itemsreducer- ATwoStepMapper.Combineableimplementation that does what you want.- Returns:
- The results...
-
reduceCombineable
public <W, R, A extends TwoStepMapper.Combineable<W,R, R reduceCombineableA>> (Collection<W> work, IntSupplier parallelism, Supplier<A> reducer) - See Also:
-
reduceCombineable
public <W, R, A extends TwoStepMapper.Combineable<W,R, R reduceCombineableA>> (Collection<W> work, Supplier<A> reducer) Using parallelismParallelism.CORES.- See Also:
-
reduceMergeable
public <W, R, A extends TwoStepMapper.Mergeable<W,R>> R reduceMergeable(Collection<W> work, int parallelism, Supplier<A> reducer) Will create at mostparallelismtasks to work through theworkitems, processing them withreducer. The results of each task'sreducerwill be merged into a single instance, and the results of that instance will be returned.Each
TwoStepMapper.Mergeableis only worked on by a single thread, and the results are combined into a single instance. The instances are not reused.- Parameters:
work- The collection of work itemsparallelism- The maximum number of concurrent workers that will process the work itemsreducer- ATwoStepMapper.Mergeableimplementation that does what you want.- Returns:
- The results...
-
reduceMergeable
public <W, R, A extends TwoStepMapper.Mergeable<W,R>> R reduceMergeable(Collection<W> work, IntSupplier parallelism, Supplier<A> reducer) - See Also:
-
reduceMergeable
public <W, R, A extends TwoStepMapper.Mergeable<W,R>> R reduceMergeable(Collection<W> work, Supplier<A> reducer) Using parallelismParallelism.CORES.- See Also:
-
run
Will create preciselyparallelismtasks that each execute theprocessor.- Parameters:
parallelism- The number of concurrent workers/threads that will runprocessor- The processing code
-
run
- See Also:
-
run
-
take
Will submit preciselyparallelismtasks that each take from thequeuefeeding the items to theprocessor. The tasks will continue to run until the returnedAtomicBooleanis set tofalse(or the thread is interrupted).If the threads of the underlying
ExecutorServiceare daemon threads, the JVM will not wait for them to finish before it exits. The default behaviour, usingINSTANCEornewInstance(String), is to make use of ojAlgo'sDaemonPoolExecutor.- Type Parameters:
T- The work item type- Parameters:
queue- The queue to take fromparallelism- How many parallel workers to createprocessor- What to do with each of the work items- Returns:
- A flag that can be used to signal the tasks to stop
-
take
public <T> AtomicBoolean take(BlockingQueue<T> queue, IntSupplier parallelism, Consumer<T> processor) - See Also:
-
newDivider()orParallelismSupplier.newDivider(int)instead