Class NonThreadedProcessingUnit
- java.lang.Object
-
- org.apache.uima.collection.impl.cpm.engine.NonThreadedProcessingUnit
-
public class NonThreadedProcessingUnit extends java.lang.ObjectThe Class NonThreadedProcessingUnit.
-
-
Field Summary
Fields Modifier and Type Field Description protected java.lang.Object[]artifactThe artifact.private CAS[]casCacheThe cas cache.protected CAS[]casListThe cas list.protected CPECasPoolcasPoolThe cas pool.protected CASconversionCasThe conversion cas.protected CAS[]conversionCasArrayThe conversion cas array.protected CpeConfigurationcpeConfigurationThe cpe configuration.protected CPMEnginecpmThe cpm.protected CasConvertermConverterThe m converter.protected booleannotifyListenersThe notify listeners.protected longnumToProcessThe num to process.protected BoundedWorkQueueoutputQueueThe output queue.protected java.util.LinkedListprocessContainersThe process containers.protected ProcessTraceprocessingUnitProcessTraceThe processing unit process trace.protected booleanrelaseCASThe relase CAS.protected java.util.ArrayListstatusCbLThe status cb L.protected java.lang.StringthreadIdThe thread id.intthreadStateThe thread state.protected UimaTimertimerThe timer.protected BoundedWorkQueueworkQueueThe work queue.
-
Constructor Summary
Constructors Constructor Description NonThreadedProcessingUnit(CPMEngine acpm)Instantiates a new non threaded processing unit.NonThreadedProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)Initialize the PU.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidaddStatusCallbackListener(BaseStatusCallbackListener aListener)Plugs in Listener object used for notifications.protected booleananalyze(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp)Analyze.voidcleanup()Null out fields of this object.private voidclearCasCache()Clear cas cache.private booleancontainerDisabled(ProcessingContainer aContainer)Container disabled.private voidconvertCasDataToCasObject(int casIndex, java.lang.String aContainerName, java.lang.Object[] aCasObjectList)Convert cas data to cas object.voiddisableCasProcessor(int aCasProcessorIndex)Disable a CASProcessor in the processing pipeline.voiddisableCasProcessor(java.lang.String aCasProcessorName)Alternative method to disable Cas Processor.private voiddoEndOfBatch(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, int howManyCases)Do end of batch.protected voiddoNotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)Notifies all configured listeners.private voiddoReleaseCasProcessor(ProcessingContainer aContainer, CasProcessor aCasProcessor)Do release cas processor.voidenableCasProcessor(java.lang.String aCasProcessorName)Enables Cas Processor with a given name.private booleanfilterOutTheCAS(ProcessingContainer aContainer, boolean isCasObject, java.lang.Object[] aCasObjectList)Filter out the CAS.protected longgetBytes(java.lang.Object aCas)Returns the size of the CAS object.java.util.ArrayListgetCallbackListeners()Returns list of listeners used by this PU for callbacks.private booleanhandleErrors(java.lang.Throwable e, ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTrace, java.lang.Object[] aCasObjectList, boolean isCasObject)Main routine that handles errors occuring in the processing loop.private voidhandleServiceException(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, java.lang.Exception ex)Handle service exception.private voidhandleSkipCasProcessor(ProcessingContainer aContainer, java.lang.Object[] aCasObjectList, boolean isLastCP)Handle skip cas processor.private voidinvokeCasDataCasProcessor(ProcessingContainer container, CasProcessor processor, java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry)Invoke cas data cas processor.private voidinvokeCasObjectCasProcessor(ProcessingContainer container, CasProcessor processor, java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject)Invoke cas object cas processor.protected booleanisProcessorReady(int aStatus)Check if the CASProcessor status is available for processing.protected voidnotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es.private booleanpauseContainer(ProcessingContainer aContainer, java.lang.Exception aException, java.lang.String aThreadId)Determines if the thread should be paused.private voidpostAnalysis(java.lang.Object[] aCasObjectList, boolean isCasObject, java.lang.Object[] casObjects, ProcessTrace aProcessTr, boolean doneAlready)Post analysis.private voidreleaseCases(java.lang.Object aCasList, boolean lastProcessor, java.lang.String aName)Conditionally, releases CASes back to the CAS pool.voidremoveStatusCallbackListener(BaseStatusCallbackListener aListener)Removes given listener from the list of listeners.voidsetCasPool(CPECasPool aPool)Sets the cas pool.voidsetContainers(java.util.LinkedList processorList)Plugs in a list of Cas Processor containers.voidsetCPMEngine(CPMEngine acpm)Alternative method of providing the reference to the component managing the lifecycle of the CPE.voidsetInputQueue(BoundedWorkQueue aInputQueue)Alternative method of providing a queue from which this PU will read bundle of Cas.voidsetNotifyListeners(boolean aDoNotify)Set a flag indicating if notifications should be made via configured Listeners.voidsetOutputQueue(BoundedWorkQueue aOutputQueue)Alternative method of providing a queue where this PU will deposit results of analysis.voidsetProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)Plugs in ProcessTrace object used to collect statistics.voidsetReleaseCASFlag(boolean aFlag)Sets the release CAS flag.voidsetUimaTimer(UimaTimer aTimer)Plugs in custom timer used by the PU for getting time.voidstopCasProcessors(boolean kill)Stops all Cas Processors that are part of this PU.
-
-
-
Field Detail
-
threadState
public int threadState
The thread state.
-
casPool
protected CPECasPool casPool
The cas pool.
-
relaseCAS
protected boolean relaseCAS
The relase CAS.
-
cpm
protected CPMEngine cpm
The cpm.
-
workQueue
protected BoundedWorkQueue workQueue
The work queue.
-
outputQueue
protected BoundedWorkQueue outputQueue
The output queue.
-
mConverter
protected CasConverter mConverter
The m converter.
-
processingUnitProcessTrace
protected ProcessTrace processingUnitProcessTrace
The processing unit process trace.
-
processContainers
protected java.util.LinkedList processContainers
The process containers.
-
numToProcess
protected long numToProcess
The num to process.
-
casList
protected CAS[] casList
The cas list.
-
statusCbL
protected java.util.ArrayList statusCbL
The status cb L.
-
notifyListeners
protected boolean notifyListeners
The notify listeners.
-
conversionCas
protected CAS conversionCas
The conversion cas.
-
artifact
protected java.lang.Object[] artifact
The artifact.
-
conversionCasArray
protected CAS[] conversionCasArray
The conversion cas array.
-
timer
protected UimaTimer timer
The timer.
-
threadId
protected java.lang.String threadId
The thread id.
-
cpeConfiguration
protected CpeConfiguration cpeConfiguration
The cpe configuration.
-
casCache
private CAS[] casCache
The cas cache.
-
-
Constructor Detail
-
NonThreadedProcessingUnit
public NonThreadedProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)
Initialize the PU.- Parameters:
acpm- - component managing life cycle of the CPEaInputQueue- - queue to read fromaOutputQueue- - queue to write to
-
NonThreadedProcessingUnit
public NonThreadedProcessingUnit(CPMEngine acpm)
Instantiates a new non threaded processing unit.- Parameters:
acpm- the acpm
-
-
Method Detail
-
setInputQueue
public void setInputQueue(BoundedWorkQueue aInputQueue)
Alternative method of providing a queue from which this PU will read bundle of Cas.- Parameters:
aInputQueue- - read queue
-
setOutputQueue
public void setOutputQueue(BoundedWorkQueue aOutputQueue)
Alternative method of providing a queue where this PU will deposit results of analysis.- Parameters:
aOutputQueue- - queue to write to
-
setCPMEngine
public void setCPMEngine(CPMEngine acpm)
Alternative method of providing the reference to the component managing the lifecycle of the CPE.- Parameters:
acpm- - reference to the contrlling engine
-
cleanup
public void cleanup()
Null out fields of this object. Call this only when this object is no longer needed.
-
setNotifyListeners
public void setNotifyListeners(boolean aDoNotify)
Set a flag indicating if notifications should be made via configured Listeners.- Parameters:
aDoNotify- - true if notification is required, false otherwise
-
addStatusCallbackListener
public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
Plugs in Listener object used for notifications.- Parameters:
aListener- -BaseStatusCallbackListenerinstance
-
getCallbackListeners
public java.util.ArrayList getCallbackListeners()
Returns list of listeners used by this PU for callbacks.- Returns:
- - lif of
BaseStatusCallbackListenerinstances
-
removeStatusCallbackListener
public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Removes given listener from the list of listeners.- Parameters:
aListener- - object to remove from the list
-
setProcessingUnitProcessTrace
public void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
Plugs in ProcessTrace object used to collect statistics.- Parameters:
aProcessingUnitProcessTrace- - object to compile stats
-
setUimaTimer
public void setUimaTimer(UimaTimer aTimer)
Plugs in custom timer used by the PU for getting time.- Parameters:
aTimer- - custom timer to use
-
setContainers
public void setContainers(java.util.LinkedList processorList)
Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.- Parameters:
processorList- CASProcessor to be added to the processing pipeline
-
disableCasProcessor
public void disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.- Parameters:
aCasProcessorIndex- - location in the pipeline of the Cas Processor to delete
-
disableCasProcessor
public void disableCasProcessor(java.lang.String aCasProcessorName)
Alternative method to disable Cas Processor. Uses a name to locate it.- Parameters:
aCasProcessorName- - a name of the Cas Processor to disable
-
enableCasProcessor
public void enableCasProcessor(java.lang.String aCasProcessorName)
Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.- Parameters:
aCasProcessorName- - name of the Cas Processor to enable
-
analyze
protected boolean analyze(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp) throws java.lang.ExceptionAnalyze.- Parameters:
aCasObjectList- the a cas object listpTrTemp- the tr temp- Returns:
- true, if successful
- Throws:
java.lang.Exception- the exception
-
setReleaseCASFlag
public void setReleaseCASFlag(boolean aFlag)
Sets the release CAS flag.- Parameters:
aFlag- the new release CAS flag
-
setCasPool
public void setCasPool(CPECasPool aPool)
Sets the cas pool.- Parameters:
aPool- the new cas pool
-
postAnalysis
private void postAnalysis(java.lang.Object[] aCasObjectList, boolean isCasObject, java.lang.Object[] casObjects, ProcessTrace aProcessTr, boolean doneAlready) throws java.lang.ExceptionPost analysis.- Parameters:
aCasObjectList- the a cas object listisCasObject- the is cas objectcasObjects- the cas objectsaProcessTr- the a process trdoneAlready- the done already- Throws:
java.lang.Exception- -
-
doReleaseCasProcessor
private void doReleaseCasProcessor(ProcessingContainer aContainer, CasProcessor aCasProcessor)
Do release cas processor.- Parameters:
aContainer- the a containeraCasProcessor- the a cas processor
-
doEndOfBatch
private void doEndOfBatch(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, int howManyCases)
Do end of batch.- Parameters:
aContainer- the a containeraProcessor- the a processoraProcessTr- the a process trhowManyCases- the how many cases
-
handleErrors
private boolean handleErrors(java.lang.Throwable e, ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTrace, java.lang.Object[] aCasObjectList, boolean isCasObject) throws java.lang.ExceptionMain routine that handles errors occuring in the processing loop.- Parameters:
e- - exception in the main processing loopaContainer- - current container of the Cas ProcessoraProcessor- - current Cas ProcessoraProcessTrace- - an object containing stats for this procesing loopaCasObjectList- - list of CASes being analyzedisCasObject- - determines type of CAS in the aCasObjectList ( CasData or CasObject)- Returns:
- boolean
- Throws:
java.lang.Exception- -
-
invokeCasObjectCasProcessor
private void invokeCasObjectCasProcessor(ProcessingContainer container, CasProcessor processor, java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject) throws java.lang.Exception
Invoke cas object cas processor.- Parameters:
container- the containerprocessor- the processoraCasObjectList- the a cas object listpTrTemp- the tr tempisCasObject- the is cas object- Throws:
java.lang.Exception- -
-
convertCasDataToCasObject
private void convertCasDataToCasObject(int casIndex, java.lang.String aContainerName, java.lang.Object[] aCasObjectList) throws java.lang.ExceptionConvert cas data to cas object.- Parameters:
casIndex- the cas indexaContainerName- the a container nameaCasObjectList- the a cas object list- Throws:
java.lang.Exception- -
-
invokeCasDataCasProcessor
private void invokeCasDataCasProcessor(ProcessingContainer container, CasProcessor processor, java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry) throws java.lang.Exception
Invoke cas data cas processor.- Parameters:
container- the containerprocessor- the processoraCasObjectList- the a cas object listpTrTemp- the tr tempisCasObject- the is cas objectretry- the retry- Throws:
java.lang.Exception- -
-
containerDisabled
private boolean containerDisabled(ProcessingContainer aContainer)
Container disabled.- Parameters:
aContainer- the a container- Returns:
- true, if successful
-
isProcessorReady
protected boolean isProcessorReady(int aStatus)
Check if the CASProcessor status is available for processing.- Parameters:
aStatus- the a status- Returns:
- true, if is processor ready
-
filterOutTheCAS
private boolean filterOutTheCAS(ProcessingContainer aContainer, boolean isCasObject, java.lang.Object[] aCasObjectList)
Filter out the CAS.- Parameters:
aContainer- the a containerisCasObject- the is cas objectaCasObjectList- the a cas object list- Returns:
- true, if successful
-
notifyListeners
protected void notifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es.- Parameters:
aCas- - object containing an array of OR a single instance of CasisCasObject- - true if instance of Cas is of type Cas, false otherwiseaEntityProcStatus- - status object that may contain exceptions and trace
-
doNotifyListeners
protected void doNotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.- Parameters:
aCas- - Cas to pass to listenerisCasObject- - true is Cas is of type CASaEntityProcStatus- - status object containing exceptions and trace info
-
clearCasCache
private void clearCasCache()
Clear cas cache.
-
pauseContainer
private boolean pauseContainer(ProcessingContainer aContainer, java.lang.Exception aException, java.lang.String aThreadId)
Determines if the thread should be paused. Pausing container effectively pauses ALL Cas Processors that are managed by the container. The pause is needed when there are multiple pipelines shareing a common service. If this service dies (Socket Down), only one thread should initiate service restart. While the service is being restarted no invocations on the service should be done. Containers will be resumed on successfull service restart.- Parameters:
aContainer- - a container that manages the current Cas Processor.aException- the a exceptionaThreadId- - id of the current thread- Returns:
- true, if successful
-
handleServiceException
private void handleServiceException(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, java.lang.Exception ex) throws java.lang.Exception
Handle service exception.- Parameters:
aContainer- the a containeraProcessor- the a processoraProcessTr- the a process trex- the ex- Throws:
java.lang.Exception- -
-
handleSkipCasProcessor
private void handleSkipCasProcessor(ProcessingContainer aContainer, java.lang.Object[] aCasObjectList, boolean isLastCP) throws java.lang.Exception
Handle skip cas processor.- Parameters:
aContainer- the a containeraCasObjectList- the a cas object listisLastCP- the is last CP- Throws:
java.lang.Exception- -
-
getBytes
protected long getBytes(java.lang.Object aCas)
Returns the size of the CAS object. Currently only CASData is supported.- Parameters:
aCas- - Cas to get the size for- Returns:
- the size of the CAS object. Currently only CASData is supported.
-
releaseCases
private void releaseCases(java.lang.Object aCasList, boolean lastProcessor, java.lang.String aName)Conditionally, releases CASes back to the CAS pool. The release only occurs if the Cas Processor is the last in the processing chain.- Parameters:
aCasList- - list of CASes to releaselastProcessor- - determines if the release takes placeaName- the a name
-
stopCasProcessors
public void stopCasProcessors(boolean kill)
Stops all Cas Processors that are part of this PU.- Parameters:
kill- - true if CPE has been stopped before finishing processing during external stop
-
-