diff options
author | Kshitij Bansal <kshitij@cs.nyu.edu> | 2012-09-08 14:25:25 +0000 |
---|---|---|
committer | Kshitij Bansal <kshitij@cs.nyu.edu> | 2012-09-08 14:25:25 +0000 |
commit | cfa9e2cbbdf77a325791b5548b41093a0781311c (patch) | |
tree | e843d38afe4b197a2dd81316998477cedac26c52 /src/main/portfolio_util.h | |
parent | 6935324f45bd7f2c735ca4120a9e800ef8903442 (diff) |
Single driver for both sequential and portfolio
A "command executer" layer between parsing commands and invoking them.
New implementation of portfolio driver splits only when check-sat or query
command is encountered, and then switches back to sequential till the next
one. As side effect, restores functionality of interactive mode and
push/pops.
Diffstat (limited to 'src/main/portfolio_util.h')
-rw-r--r-- | src/main/portfolio_util.h | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/src/main/portfolio_util.h b/src/main/portfolio_util.h new file mode 100644 index 000000000..eed993b9f --- /dev/null +++ b/src/main/portfolio_util.h @@ -0,0 +1,209 @@ +/********************* */ +/*! \file portfolio_util.h + ** \verbatim + ** Original author: kshitij + ** Major contributors: taking, mdeters + ** Minor contributors (to current version): + ** This file is part of the CVC4 prototype. + ** Copyright (c) 2009, 2010, 2011 The Analysis of Computer Systems Group (ACSys) + ** Courant Institute of Mathematical Sciences + ** New York University + ** See the file COPYING in the top-level source directory for licensing + ** information.\endverbatim + ** + ** \brief Code relevant only for portfolio builds + **/ + +#ifndef __CVC4_PORTFOLIO_UTIL_H +#define __CVC4_PORTFOLIO_UTIL_H + +#include <queue> + +#include "expr/pickler.h" +#include "util/channel.h" +#include "util/lemma_input_channel.h" +#include "util/lemma_output_channel.h" +#include "main/options.h" + +namespace CVC4 { + +typedef expr::pickle::Pickle ChannelFormat; + +template <typename T> +class EmptySharedChannel: public SharedChannel<T> { +public: + EmptySharedChannel(int) {} + bool push(const T&) { return true; } + T pop() { return T(); } + bool empty() { return true; } + bool full() { return false; } +}; + +class PortfolioLemmaOutputChannel : public LemmaOutputChannel { +private: + std::string d_tag; + SharedChannel<ChannelFormat>* d_sharedChannel; + expr::pickle::MapPickler d_pickler; + +public: + int cnt; + PortfolioLemmaOutputChannel(std::string tag, + SharedChannel<ChannelFormat> *c, + ExprManager* em, + VarMap& to, + VarMap& from) : + d_tag(tag), + d_sharedChannel(c), + d_pickler(em, to, from), + cnt(0) + {} + + void notifyNewLemma(Expr lemma) { + if(Debug.isOn("disable-lemma-sharing")) { + return; + } + if(options::sharingFilterByLength() >= 0) { + // 0 would mean no-sharing effectively + if(int(lemma.getNumChildren()) > options::sharingFilterByLength()) { + return; + } + } + ++cnt; + Trace("sharing") << d_tag << ": " << lemma << std::endl; + expr::pickle::Pickle pkl; + try { + d_pickler.toPickle(lemma, pkl); + d_sharedChannel->push(pkl); + if(Trace.isOn("showSharing") && options::thread_id() == 0) { + *options::out() << "thread #0: notifyNewLemma: " << lemma + << std::endl; + } + } catch(expr::pickle::PicklingException& p){ + Trace("sharing::blocked") << lemma << std::endl; + } + } + +};/* class PortfolioLemmaOutputChannel */ + +class PortfolioLemmaInputChannel : public LemmaInputChannel { +private: + std::string d_tag; + SharedChannel<ChannelFormat>* d_sharedChannel; + expr::pickle::MapPickler d_pickler; + +public: + PortfolioLemmaInputChannel(std::string tag, + SharedChannel<ChannelFormat>* c, + ExprManager* em, + VarMap& to, + VarMap& from) : + d_tag(tag), + d_sharedChannel(c), + d_pickler(em, to, from){ + } + + bool hasNewLemma(){ + Debug("lemmaInputChannel") << d_tag << ": " << "hasNewLemma" << std::endl; + return !d_sharedChannel->empty(); + } + + Expr getNewLemma() { + Debug("lemmaInputChannel") << d_tag << ": " << "getNewLemma" << std::endl; + expr::pickle::Pickle pkl = d_sharedChannel->pop(); + + Expr e = d_pickler.fromPickle(pkl); + if(Trace.isOn("showSharing") && options::thread_id() == 0) { + *options::out() << "thread #0: getNewLemma: " << e << std::endl; + } + return e; + } + +};/* class PortfolioLemmaInputChannel */ + +std::vector<Options> parseThreadSpecificOptions(Options opts); + +template<typename T> +void sharingManager(unsigned numThreads, + SharedChannel<T> *channelsOut[], // out and in with respect + SharedChannel<T> *channelsIn[], + SmtEngine *smts[]) // to smt engines +{ + Trace("sharing") << "sharing: thread started " << std::endl; + std::vector <int> cnt(numThreads); // Debug("sharing") + + std::vector< std::queue<T> > queues; + for(unsigned i = 0; i < numThreads; ++i){ + queues.push_back(std::queue<T>()); + } + + const unsigned int sharingBroadcastInterval = 1; + + boost::mutex mutex_activity; + + /* Disable interruption, so that we can check manually */ + boost::this_thread::disable_interruption di; + + while(not boost::this_thread::interruption_requested()) { + + boost::this_thread::sleep + (boost::posix_time::milliseconds(sharingBroadcastInterval)); + + for(unsigned t = 0; t < numThreads; ++t) { + + /* No activity on this channel */ + if(channelsOut[t]->empty()) continue; + + /* Alert if channel full, so that we increase sharingChannelSize + or decrease sharingBroadcastInterval */ + Assert(not channelsOut[t]->full()); + + T data = channelsOut[t]->pop(); + + if(Trace.isOn("sharing")) { + ++cnt[t]; + Trace("sharing") << "sharing: Got data. Thread #" << t + << ". Chunk " << cnt[t] << std::endl; + } + + for(unsigned u = 0; u < numThreads; ++u) { + if(u != t){ + Trace("sharing") << "sharing: adding to queue " << u << std::endl; + queues[u].push(data); + } + }/* end of inner for: broadcast activity */ + + } /* end of outer for: look for activity */ + + for(unsigned t = 0; t < numThreads; ++t){ + /* Alert if channel full, so that we increase sharingChannelSize + or decrease sharingBroadcastInterval */ + Assert(not channelsIn[t]->full()); + + while(!queues[t].empty() && !channelsIn[t]->full()){ + Trace("sharing") << "sharing: pushing on channel " << t << std::endl; + T data = queues[t].front(); + channelsIn[t]->push(data); + queues[t].pop(); + } + } + } /* end of infinite while */ + + Trace("interrupt") + << "sharing thread interrupted, interrupting all smtEngines" << std::endl; + + for(unsigned t = 0; t < numThreads; ++t) { + Trace("interrupt") << "Interrupting thread #" << t << std::endl; + try{ + smts[t]->interrupt(); + }catch(ModalException &e){ + // It's fine, the thread is probably not there. + Trace("interrupt") << "Could not interrupt thread #" << t << std::endl; + } + } + + Trace("sharing") << "sharing: Interrupted, exiting." << std::endl; +} + +}/*CVC4 namespace*/ + +#endif /* __CVC4_PORTFOLIO_UTIL_H */ |