48 #include "BESInterface.h" 50 #include "TheBESKeys.h" 51 #include "BESResponseHandler.h" 52 #include "BESAggFactory.h" 53 #include "BESAggregationServer.h" 54 #include "BESReporterList.h" 55 #include "BESContextManager.h" 57 #include "BESExceptionManager.h" 59 #include "BESDataNames.h" 62 #include "BESStopWatch.h" 63 #include "BESTimeoutError.h" 64 #include "BESInternalError.h" 65 #include "BESInternalFatalError.h" 71 list<p_bes_init> BESInterface::_init_list;
72 list<p_bes_end> BESInterface::_end_list;
74 static jmp_buf timeout_jump;
75 static bool timeout_jump_valid =
false;
86 static volatile int timeout = 0;
88 #define BES_TIMEOUT_KEY "BES.TimeOutInSeconds" 99 static void catch_sig_alarm(
int sig)
101 if (sig == SIGALRM) {
102 LOG(
"Child listener timeout after " << timeout <<
" seconds, exiting." << endl);
107 if (timeout_jump_valid)
108 longjmp(timeout_jump, 1);
113 signal(SIGTERM, SIG_DFL);
119 static void register_signal_handler()
121 struct sigaction act;
122 sigemptyset(&act.sa_mask);
123 sigaddset(&act.sa_mask, SIGALRM);
129 act.sa_handler = catch_sig_alarm;
130 if (sigaction(SIGALRM, &act, 0))
131 throw BESInternalFatalError(
"Could not register a handler to catch alarm/timeout.", __FILE__, __LINE__);
162 static pthread_t alarm_thread;
164 static void* alarm_wait(
void * )
166 BESDEBUG(
"bes",
"Starting: " << __PRETTY_FUNCTION__ << endl);
170 sigemptyset(&sigset);
171 sigaddset(&sigset, SIGALRM);
172 sigprocmask(SIG_BLOCK, &sigset, NULL);
177 int result = sigwait(&sigset, &sig);
179 BESDEBUG(
"bes",
"Fatal error establishing timeout: " << strerror(result) << endl);
180 throw BESInternalFatalError(
string(
"Fatal error establishing timeout: ") + strerror(result), __FILE__, __LINE__);
182 else if (result == 0 && sig == SIGALRM) {
183 BESDEBUG(
"bes",
"Timeout found in " << __PRETTY_FUNCTION__ << endl);
188 oss <<
"While waiting for a timeout, found signal '" << result <<
"' in " << __PRETTY_FUNCTION__ << ends;
189 BESDEBUG(
"bes", oss.str() << endl);
194 static void wait_for_timeout()
196 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ << endl);
198 pthread_attr_t thread_attr;
200 if (pthread_attr_init(&thread_attr) != 0)
202 if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED ) != 0)
203 throw BESInternalFatalError(
"Failed to complete pthread attribute initialization.", __FILE__, __LINE__);
205 int status = pthread_create(&alarm_thread, &thread_attr, alarm_wait, NULL);
211 BESInterface::BESInterface(ostream *output_stream) :
212 _strm(output_stream), _timeout_from_keys(0), _dhi(0), _transmitter(0)
214 if (!output_stream) {
215 throw BESInternalError(
"output stream must be set in order to output responses", __FILE__, __LINE__);
223 string timeout_key_value;
226 istringstream iss(timeout_key_value);
227 iss >> _timeout_from_keys;
231 register_signal_handler();
238 BESInterface::~BESInterface()
280 extern BESStopWatch *bes_timing::elapsedTimeToReadStart;
281 extern BESStopWatch *bes_timing::elapsedTimeToTransmitStart;
283 int BESInterface::execute_request(
const string &from)
285 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ << endl);
288 throw BESInternalError(
"DataHandlerInterface can not be null", __FILE__, __LINE__);
292 if (BESISDEBUG(TIMING_LOG)) {
293 sw.
start(
"BESInterface::execute_request", _dhi->
data[REQUEST_ID]);
295 bes_timing::elapsedTimeToReadStart =
new BESStopWatch();
296 bes_timing::elapsedTimeToReadStart->
start(
"TIME_TO_READ_START", _dhi->
data[REQUEST_ID]);
298 bes_timing::elapsedTimeToTransmitStart =
new BESStopWatch();
299 bes_timing::elapsedTimeToTransmitStart->
start(
"TIME_TO_TRANSMIT_START", _dhi->
data[REQUEST_ID]);
303 throw BESInternalError(
"DataHandlerInterface can not be null", __FILE__, __LINE__);
306 _dhi->set_output_stream(_strm);
307 _dhi->
data[REQUEST_FROM] = from;
309 pid_t thepid = getpid();
312 _dhi->
data[SERVER_PID] = ss.str();
324 *(BESLog::TheLog()) << _dhi->
data[SERVER_PID] <<
" from " << _dhi->
data[REQUEST_FROM] <<
" request received" 334 throw BESInternalError(
"Unable to transmit the response, no transmitter", __FILE__, __LINE__);
343 if (setjmp(timeout_jump) == 0) {
344 timeout_jump_valid =
true;
347 timeout_jump_valid =
false;
351 oss <<
"BES listener timeout after " << timeout <<
" seconds." << ends;
355 _dhi->executed =
true;
358 timeout_jump_valid =
false;
361 catch (bad_alloc &e) {
362 timeout_jump_valid =
false;
366 catch (exception &e) {
367 timeout_jump_valid =
false;
372 timeout_jump_valid =
false;
373 BESInternalError ex(
"An undefined exception has been thrown", __FILE__, __LINE__);
377 delete bes_timing::elapsedTimeToReadStart;
378 bes_timing::elapsedTimeToReadStart = 0;
380 delete bes_timing::elapsedTimeToTransmitStart;
381 bes_timing::elapsedTimeToTransmitStart = 0;
389 int BESInterface::finish(
int )
391 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ <<
" ***" << endl);
409 catch (bad_alloc &) {
410 string serr =
"BES out of memory";
415 string serr =
"An undefined exception has been thrown";
437 (*BESLog::TheLog()) <<
"Problem logging status: " << ex.
get_message() << endl;
440 (*BESLog::TheLog()) <<
"Unknown problem logging status" << endl;
447 (*BESLog::TheLog()) <<
"Problem reporting request: " << ex.
get_message() << endl;
450 (*BESLog::TheLog()) <<
"Unknown problem reporting request" << endl;
457 (*BESLog::TheLog()) <<
"Problem ending request: " << ex.
get_message() << endl;
460 (*BESLog::TheLog()) <<
"Unknown problem ending request" << endl;
466 int BESInterface::finish_with_error(
int status)
470 string serr =
"Finish_with_error called with no error object";
475 return finish(status);
478 void BESInterface::add_init_callback(p_bes_init init)
480 _init_list.push_back(init);
491 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::initialize", _dhi->
data[REQUEST_ID]);
493 BESDEBUG(
"bes",
"Initializing request: " << _dhi->
data[DATA_REQUEST] <<
" ... " << endl);
494 bool do_continue =
true;
495 init_iter i = _init_list.begin();
497 for (; i != _init_list.end() && do_continue ==
true; i++) {
499 do_continue = p(*_dhi);
503 BESDEBUG(
"bes",
"FAILED" << endl);
504 string se =
"Initialization callback failed, exiting";
508 BESDEBUG(
"bes",
"OK" << endl);
537 if (BESISDEBUG(TIMING_LOG))
538 sw.
start(
"BESInterface::execute_data_request_plan(\"" + _dhi->
data[DATA_REQUEST] +
"\")",
539 _dhi->
data[REQUEST_ID]);
544 string context = BESContextManager::TheManager()->
get_context(
"bes_timeout", found);
546 timeout = strtol(context.c_str(), NULL, 10);
547 VERBOSE(
"Set request timeout to " << timeout <<
" seconds (from context)." << endl);
550 else if (_timeout_from_keys != 0) {
551 timeout = _timeout_from_keys;
552 VERBOSE(
"Set request timeout to " << timeout <<
" seconds (from keys)." << endl);
558 "Executing request: " << _dhi->
data[DATA_REQUEST] <<
" ... " << endl);
563 BESDEBUG(
"bes",
"FAILED" << endl);
564 string se =
"The response handler \"" + _dhi->
action 565 +
"\" does not exist";
568 BESDEBUG(
"bes",
"OK" << endl);
597 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::invoke_aggregation", _dhi->
data[REQUEST_ID]);
599 if (_dhi->
data[AGG_CMD] !=
"") {
600 BESDEBUG(
"bes",
"aggregating with: " << _dhi->
data[AGG_CMD] <<
" ... "<< endl);
606 BESDEBUG(
"bes",
"FAILED" << endl);
607 string se =
"The aggregation handler " + _dhi->
data[AGG_HANDLER] +
"does not exist";
610 BESDEBUG(
"bes",
"OK" << endl);
630 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::transmit_data", _dhi->
data[REQUEST_ID]);
632 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Transmitting request: " << _dhi->
data[DATA_REQUEST] << endl);
639 (*BESLog::TheLog()) << strm.str() << endl;
640 BESDEBUG(
"bes",
" transmitting error info using transmitter ... " << endl << strm.str() << endl);
644 else if (_dhi->response_handler) {
645 BESDEBUG(
"bes",
" BESInterface::transmit_data() - Response handler " << _dhi->response_handler->
get_name() << endl);
647 _dhi->response_handler->
transmit(_transmitter, *_dhi);
654 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Transmitting error info using cout ... " << endl);
660 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Unable to transmit the response ... FAILED " << endl);
662 throw BESInternalError(
"Unable to transmit the response, no transmitter", __FILE__, __LINE__);
667 BESDEBUG(
"bes",
"BESInterface::transmit_data() - OK" << endl);
689 BESDEBUG(
"bes",
"Reporting on request: " << _dhi->
data[DATA_REQUEST] <<
" ... " << endl);
691 BESReporterList::TheList()->report(*_dhi);
693 BESDEBUG(
"bes",
"OK" << endl);
696 void BESInterface::add_end_callback(p_bes_end end)
698 _end_list.push_back(end);
708 BESDEBUG(
"bes",
"Ending request: " << _dhi->
data[DATA_REQUEST] <<
" ... " << endl);
709 end_iter i = _end_list.begin();
710 for (; i != _end_list.end(); i++) {
719 BESDEBUG(
"bes",
"Calling BESContainer::release()" << endl);
724 BESDEBUG(
"bes",
"OK" << endl);
731 if (_dhi) _dhi->
clean();
761 strm << BESIndent::LMarg <<
"BESInterface::dump - (" << (
void *)
this <<
")" << endl;
764 if (_init_list.size()) {
765 strm << BESIndent::LMarg <<
"termination functions:" << endl;
767 init_iter i = _init_list.begin();
768 for (; i != _init_list.end(); i++) {
771 strm << BESIndent::LMarg << (
void *) (*i) << endl;
773 BESIndent::UnIndent();
776 strm << BESIndent::LMarg <<
"termination functions: none" << endl;
779 if (_end_list.size()) {
780 strm << BESIndent::LMarg <<
"termination functions:" << endl;
782 end_iter i = _end_list.begin();
783 for (; i != _end_list.end(); i++) {
784 strm << BESIndent::LMarg << (
void *) (*i) << endl;
786 BESIndent::UnIndent();
789 strm << BESIndent::LMarg <<
"termination functions: none" << endl;
792 strm << BESIndent::LMarg <<
"data handler interface:" << endl;
795 BESIndent::UnIndent();
798 strm << BESIndent::LMarg <<
"transmitter:" << endl;
800 _transmitter->
dump(strm);
801 BESIndent::UnIndent();
804 strm << BESIndent::LMarg <<
"transmitter: not set" << endl;
806 BESIndent::UnIndent();
void clean()
clean up any information created within this data handler interface
virtual void dump(ostream &strm) const
dumps information about this object
error thrown if there is a user syntax error in the request or any other user error ...
exception thrown if an internal error is found and is fatal to the BES
exception thrown if inernal error encountered
virtual void initialize()
Initialize the BES object.
void next_container()
set the container pointer to the next * container in the list, null if at the end or no containers in...
virtual void transmit(BESTransmitter *transmitter, BESDataHandlerInterface &dhi)=0
transmit the response object built by the execute command using the specified transmitter object ...
virtual void transmit(BESTransmitter *transmitter, BESDataHandlerInterface &dhi)=0
transmit the informational object
virtual std::string get_message()
get the error message for this exception
virtual void execute(BESDataHandlerInterface &dhi)=0
knows how to build a requested response object
virtual int exception_manager(BESError &e)
Manage any exceptions thrown during the whole process.
virtual void aggregate(BESDataHandlerInterface &dhi)=0
aggregate the response object
virtual string get_context(const string &name, bool &found)
retrieve the value of the specified context from the BES
virtual bool start(string name)
virtual void transmit_data()
Transmit the resulting response object.
handler object that knows how to create a specific response object
virtual string get_name() const
return the name of this response object
Abstract exception class for the BES with basic string message.
virtual void report_request()
Report the request and status of the request to BESReporterList::TheList()
virtual void validate_data_request()
Validate the incoming request information.
virtual void build_data_request_plan()=0
Build the data request plan.
virtual void clean()
Clean up after the request.
void dump(ostream &strm) const
dumps information about this object
void get_value(const string &s, string &val, bool &found)
Retrieve the value of a given key, if set.
virtual void invoke_aggregation()
Aggregate the resulting response object.
map< string, string > data
the map of string data that will be required for the current request.
virtual void end_request()
End the BES request.
virtual BESAggregationServer * find_handler(const string &handler_name)
returns the aggregation handler with the given name in the list
virtual void log_status()
Log the status of the request.
virtual void dump(ostream &strm) const
dumps information about this object
void first_container()
set the container pointer to the first container in the containers list
BESInfo * error_info
error information object
virtual int handle_exception(BESError &e, BESDataHandlerInterface &dhi)
Manage any exceptions thrown during the handling of a request.
Abstraction representing mechanism for aggregating data.
virtual void execute_data_request_plan()
Execute the data request plan.
virtual void print(ostream &strm)
print the information from this informational object to the specified stream
static BESKeys * TheKeys()
string action
the response object requested, e.g. das, dds
BESContainer * container
pointer to current container in this interface