OpenWalnut  1.2.5
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
WModuleContainer.cpp
1 //---------------------------------------------------------------------------
2 //
3 // Project: OpenWalnut ( http://www.openwalnut.org )
4 //
5 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
6 // For more information see http://www.openwalnut.org/copying
7 //
8 // This file is part of OpenWalnut.
9 //
10 // OpenWalnut is free software: you can redistribute it and/or modify
11 // it under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 3 of the License, or
13 // (at your option) any later version.
14 //
15 // OpenWalnut is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
22 //
23 //---------------------------------------------------------------------------
24 
25 #include <list>
26 #include <set>
27 #include <vector>
28 #include <string>
29 #include <sstream>
30 #include <algorithm>
31 #include <utility>
32 
33 #include "../common/WLogger.h"
34 #include "../common/WThreadedRunner.h"
35 #include "WBatchLoader.h"
36 #include "WKernel.h"
37 #include "WModule.h"
38 #include "WModuleCombiner.h"
39 #include "WModuleFactory.h"
40 #include "WModuleInputConnector.h"
41 #include "WModuleOutputConnector.h"
42 #include "WModuleTypes.h"
43 #include "combiner/WApplyCombiner.h"
44 #include "exceptions/WModuleAlreadyAssociated.h"
45 #include "exceptions/WModuleSignalSubscriptionFailed.h"
46 #include "exceptions/WModuleUninitialized.h"
47 #include "WDataModule.h"
48 
49 #include "WModuleContainer.h"
50 
51 WModuleContainer::WModuleContainer( std::string name, std::string description ):
52  WModule(),
53  m_name( name ),
54  m_description( description ),
55  m_crashIfModuleCrashes( true )
56 {
57  WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG );
58  // initialize members
59 }
60 
62 {
63  // cleanup
64 }
65 
67 {
68  // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it.
69  // Only set the ready flag.
70  ready();
71 }
72 
73 boost::shared_ptr< WModule > WModuleContainer::factory() const
74 {
75  // this factory is not used actually.
76  return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
77 }
78 
79 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
80 {
81  if( !module )
82  {
83  // just ignore NULL Pointer
84  return;
85  }
86 
87  WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
88  "ModuleContainer (" + getName() + ")", LL_INFO );
89 
90  if( !module->isInitialized()() )
91  {
92  std::ostringstream s;
93  s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
94 
95  throw WModuleUninitialized( s.str() );
96  }
97 
98  // already associated with this container?
99  if( module->getAssociatedContainer() == shared_from_this() )
100  {
101  WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." ,
102  "ModuleContainer (" + getName() + ")", LL_INFO );
103  return;
104  }
105 
106  // is this module already associated?
107  if( module->isAssociated()() )
108  {
109  module->getAssociatedContainer()->remove( module );
110  }
111 
112  // get write lock
114  wlock->get().insert( module );
115  wlock.reset();
116 
117  module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) );
118  WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
119  LL_INFO );
120 
121  // now module->isUsable() is true
122 
123  // Connect the error handler and all default handlers:
125 
126  // connect the containers signal handler explicitly
127  t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 );
128  boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
129  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
130 
131  // connect default notifiers:
132  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
133  for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
134  {
135  signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
136  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
137  }
138  slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
139  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
140  {
141  // call associated notifier
142  ( *iter )( module );
143  }
144  slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock );
145  for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin();
146  iter != m_connectorEstablishedNotifiers.end(); ++iter )
147  {
148  // subscribe on each input
149  for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
150  {
151  signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) );
152  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
153  }
154  }
155  for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin();
156  iter != m_connectorClosedNotifiers.end(); ++iter )
157  {
158  // subscribe on each input
159  for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
160  {
161  signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) );
162  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
163  }
164  }
165  slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
166  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
167  {
168  signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
169  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
170  }
171  slock.unlock();
172 
173  // free the subscriptions lock
174  subscriptionsLock.reset();
175 
176  // add the modules progress to local progress combiner
177  m_progress->addSubProgress( module->getRootProgressCombiner() );
178 
179  // run it
180  if( run )
181  {
182  module->run();
183  }
184 }
185 
186 void WModuleContainer::remove( boost::shared_ptr< WModule > module )
187 {
188  // simple flat removal.
189 
190  WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
191  LL_DEBUG );
192 
193  if( module->getAssociatedContainer() != shared_from_this() )
194  {
195  return;
196  }
197 
198  // remove connections inside this container
199  module->disconnect();
200 
201  // remove progress combiner
202  m_progress->removeSubProgress( module->getRootProgressCombiner() );
203 
204  // remove signal subscriptions to this containers default notifiers
206 
207  // find all subscriptions for this module
208  std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module );
209  for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it )
210  {
211  // disconnect subscription.
212  ( *it ).second.disconnect();
213  }
214  // erase them
215  subscriptionsLock->get().erase( subscriptions.first, subscriptions.second );
216  subscriptionsLock.reset();
217 
218  // get write lock
220  wlock->get().erase( module );
221  wlock.reset();
222 
223  module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
224 
225  // tell all interested about removal
226  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock );
227  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter)
228  {
229  // call associated notifier
230  ( *iter )( module );
231  }
232  slock.unlock();
233 }
234 
235 void WModuleContainer::removeDeep( boost::shared_ptr< WModule > module )
236 {
237  WLogger::getLogger()->addLogMessage( "Deep removal of modules is not yet implemented.", "ModuleContainer (" + getName() + ")", LL_WARNING );
238 
239  // at least, remove the module itself
240  remove( module );
241 }
242 
244 {
246 
247  // lock, unlocked if l looses focus
249 
250  // iterate module list
251  for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter )
252  {
253  // is this module a data module?
254  if( ( *iter )->getType() == MODULE_DATA )
255  {
256  boost::shared_ptr< WDataModule > dm = boost::shared_static_cast< WDataModule >( *iter );
257 
258  // now check the contained dataset ( isTexture and whether it is ready )
259  if( dm->isReady()() )
260  {
261  l.insert( dm );
262  }
263  }
264  }
265 
266  return l;
267 }
268 
270 {
271  WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
272 
273  // read lock
274  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
275  for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
276  ++listIter )
277  {
278  ( *listIter )->wait( true );
279  }
280  slock.unlock();
281 
282  WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
283 
284  // lock, unlocked if l looses focus
286 
287  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
288  {
289  WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
290  "ModuleContainer (" + getName() + ")", LL_INFO );
291  ( *listIter )->wait( true );
292  ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); // remove last refs to this container inside the module
293  }
294  lock.reset();
295 
296  // get write lock
297  // lock, unlocked if l looses focus
299  wlock->get().clear();
300 }
301 
302 const std::string WModuleContainer::getName() const
303 {
304  return m_name;
305 }
306 
307 const std::string WModuleContainer::getDescription() const
308 {
309  return m_description;
310 }
311 
312 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
313 {
314  boost::unique_lock<boost::shared_mutex> lock;
315  switch (signal)
316  {
317  case WM_ASSOCIATED:
318  lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
319  m_associatedNotifiers.push_back( notifier );
320  lock.unlock();
321  break;
322  case WM_READY:
323  lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
324  m_readyNotifiers.push_back( notifier );
325  lock.unlock();
326  break;
327  case WM_REMOVED:
328  lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock );
329  m_removedNotifiers.push_back( notifier );
330  lock.unlock();
331  break;
332  default:
333  std::ostringstream s;
334  s << "Could not subscribe to unknown signal.";
335  throw WModuleSignalSubscriptionFailed( s.str() );
336  break;
337  }
338 }
339 
340 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
341 {
342  boost::unique_lock<boost::shared_mutex> lock;
343  switch (signal)
344  {
345  case WM_ERROR:
346  lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
347  m_errorNotifiers.push_back( notifier );
348  lock.unlock();
349  break;
350  default:
351  std::ostringstream s;
352  s << "Could not subscribe to unknown signal.";
353  throw WModuleSignalSubscriptionFailed( s.str() );
354  break;
355  }
356 }
357 
358 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier )
359 {
360  boost::unique_lock<boost::shared_mutex> lock;
361  switch (signal)
362  {
363  case CONNECTION_ESTABLISHED:
364  lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
365  m_connectorEstablishedNotifiers.push_back( notifier );
366  lock.unlock();
367  break;
368  case CONNECTION_CLOSED:
369  lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
370  m_connectorClosedNotifiers.push_back( notifier );
371  lock.unlock();
372  break;
373  default:
374  std::ostringstream s;
375  s << "Could not subscribe to unknown signal.";
376  throw WModuleSignalSubscriptionFailed( s.str() );
377  break;
378  }
379 }
380 
381 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
382 {
383  boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
384  if( tryOnly )
385  {
386  // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception
387  prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
388  if( !prototype )
389  {
390  return prototype;
391  }
392  }
393  else
394  {
395  prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
396  }
397 
398  return applyModule( applyOn, prototype );
399 }
400 
401 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
402  boost::shared_ptr< WModule > prototype )
403 {
404  // is this module already associated with another container?
405  if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
406  {
407  throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() +
408  std::string( "\" is associated with another container." ) );
409  }
410 
411  // create a new initialized instance of the module
412  boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );
413 
414  // add it
415  add( m, true );
416  applyOn->isReadyOrCrashed().wait();
417  m->isReadyOrCrashed().wait();
418 
419  // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a
420  // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here.
421 
422  // get offered outputs
423  WModule::InputConnectorList ins = m->getInputConnectors();
424  // get offered inputs
425  WModule::OutputConnectorList outs = applyOn->getOutputConnectors();
426 
427  // connect the first connectors. For a more sophisticated way of connecting modules, use ModuleCombiners.
428  if( !ins.empty() && !outs.empty() )
429  {
430  ( *ins.begin() )->connect( ( *outs.begin() ) );
431  }
432 
433  return m;
434 }
435 
436 boost::shared_ptr< WBatchLoader > WModuleContainer::loadDataSets( std::vector< std::string > fileNames )
437 {
438  // create thread which actually loads the data
439  boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
440  boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
441  );
442  t->run();
443  return t;
444 }
445 
446 void WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > fileNames )
447 {
448  // create thread which actually loads the data
449  boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
450  boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
451  );
452  t->run();
453  t->wait();
454 }
455 
456 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
457 {
458  boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
459  m_pendingThreads.insert( thread );
460  lock.unlock();
461 }
462 
463 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
464 {
465  boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
466  m_pendingThreads.erase( thread );
467  lock.unlock();
468 }
469 
470 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception )
471 {
472  errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container.";
473 
474  // simply forward it to the other signal handler
475  signal_error( module, exception );
476 
478  {
479  infoLog() << "Crash caused this container to shutdown.";
480  requestStop();
481  m_isCrashed( true );
482  }
483 }
484 
485 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed )
486 {
487  m_crashIfModuleCrashes = crashIfCrashed;
488 }
489 
491 {
492  return m_modules.getReadTicket();
493 }
494 
495 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module )
496 {
497  WCombinerTypes::WCompatiblesList complist;
498 
499  if( !module )
500  {
501  // be nice in case of a null pointer
502  return complist;
503  }
504 
505  // read lock the container
507 
508  // handle each module
509  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
510  {
511  WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) );
512 
513  if( lComp.size() != 0 )
514  {
515  complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) );
516  }
517  }
518 
519  // sort the compatibles
520  std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort );
521 
522  return complist;
523 }
524