33 #include "../common/WLogger.h"
34 #include "../common/WThreadedRunner.h"
35 #include "WBatchLoader.h"
38 #include "WModuleCombiner.h"
39 #include "WModuleFactory.h"
40 #include "WModuleInputConnector.h"
41 #include "WModuleOutputConnector.h"
42 #include "WModuleTypes.h"
43 #include "combiner/WApplyCombiner.h"
44 #include "exceptions/WModuleAlreadyAssociated.h"
45 #include "exceptions/WModuleSignalSubscriptionFailed.h"
46 #include "exceptions/WModuleUninitialized.h"
47 #include "WDataModule.h"
49 #include "WModuleContainer.h"
54 m_description( description ),
55 m_crashIfModuleCrashes( true )
88 "ModuleContainer (" +
getName() +
")", LL_INFO );
90 if( !module->isInitialized()() )
93 s <<
"Could not add module \"" << module->getName() <<
"\" to container \"" +
getName() +
"\". Reason: module not initialized.";
99 if( module->getAssociatedContainer() == shared_from_this() )
102 "ModuleContainer (" +
getName() +
")", LL_INFO );
107 if( module->isAssociated()() )
109 module->getAssociatedContainer()->remove( module );
114 wlock->get().insert( module );
117 module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) );
128 boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
132 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>(
m_errorNotifiersLock );
135 signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
149 for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
151 signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) );
159 for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
161 signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) );
168 signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
174 subscriptionsLock.reset();
177 m_progress->addSubProgress( module->getRootProgressCombiner() );
193 if( module->getAssociatedContainer() != shared_from_this() )
199 module->disconnect();
202 m_progress->removeSubProgress( module->getRootProgressCombiner() );
208 std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module );
212 ( *it ).second.disconnect();
215 subscriptionsLock->get().erase( subscriptions.first, subscriptions.second );
216 subscriptionsLock.reset();
220 wlock->get().erase( module );
223 module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
226 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>(
m_removedNotifiersLock );
254 if( ( *iter )->getType() == MODULE_DATA )
256 boost::shared_ptr< WDataModule > dm = boost::shared_static_cast<
WDataModule >( *iter );
259 if( dm->isReady()() )
274 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>(
m_pendingThreadsLock );
278 ( *listIter )->wait(
true );
287 for(
ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
290 "ModuleContainer (" +
getName() +
")", LL_INFO );
291 ( *listIter )->wait(
true );
292 ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
299 wlock->get().clear();
314 boost::unique_lock<boost::shared_mutex> lock;
333 std::ostringstream s;
334 s <<
"Could not subscribe to unknown signal.";
342 boost::unique_lock<boost::shared_mutex> lock;
351 std::ostringstream s;
352 s <<
"Could not subscribe to unknown signal.";
360 boost::unique_lock<boost::shared_mutex> lock;
363 case CONNECTION_ESTABLISHED:
368 case CONNECTION_CLOSED:
374 std::ostringstream s;
375 s <<
"Could not subscribe to unknown signal.";
383 boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
402 boost::shared_ptr< WModule > prototype )
405 if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
408 std::string(
"\" is associated with another container." ) );
416 applyOn->isReadyOrCrashed().wait();
417 m->isReadyOrCrashed().wait();
428 if( !ins.empty() && !outs.empty() )
430 ( *ins.begin() )->connect( ( *outs.begin() ) );
439 boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >(
new WBatchLoader( fileNames,
440 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
449 boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >(
new WBatchLoader( fileNames,
450 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
458 boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>(
m_pendingThreadsLock );
465 boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>(
m_pendingThreadsLock );
472 errorLog() <<
"Error in module \"" << module->getName() <<
"\". Forwarding to nesting container.";
479 infoLog() <<
"Crash caused this container to shutdown.";
497 WCombinerTypes::WCompatiblesList complist;
509 for(
ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
511 WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) );
513 if( lComp.size() != 0 )
515 complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) );
520 std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort );