Fawkes API  Fawkes Development Version
qa_bb_remote.cpp
00001 
00002 /***************************************************************************
00003  *  qa_bb_remote.cpp - BlackBoard remote access QA
00004  *
00005  *  Created: Mon Mar 03 17:31:18 2008
00006  *  Copyright  2006-2008  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 
00025 /// @cond QA
00026 
00027 #include <blackboard/local.h>
00028 #include <blackboard/remote.h>
00029 #include <blackboard/exceptions.h>
00030 #include <blackboard/bbconfig.h>
00031 #include <blackboard/interface_listener.h>
00032 
00033 #include <interfaces/TestInterface.h>
00034 
00035 #include <interface/interface_info.h>
00036 #include <core/exceptions/system.h>
00037 #include <netcomm/fawkes/client.h>
00038 #include <netcomm/fawkes/server_thread.h>
00039 #include <utils/time/time.h>
00040 
00041 #include <signal.h>
00042 #include <cstdlib>
00043 #include <cstring>
00044 #include <cstdio>
00045 
00046 #include <iostream>
00047 #include <vector>
00048 
00049 using namespace std;
00050 using namespace fawkes;
00051 
00052 
00053 bool quit = false;
00054 
00055 void
00056 signal_handler(int signum)
00057 {
00058   quit = true;
00059 }
00060 
00061 
00062 #define NUM_CHUNKS 5
00063 
00064 void
00065 test_messaging(TestInterface *ti_reader, TestInterface *ti_writer)
00066 {
00067   while (! quit) {
00068     int expval = ti_reader->test_int() + 1;
00069     TestInterface::SetTestIntMessage *m = new TestInterface::SetTestIntMessage(expval);
00070     unsigned int msgid = ti_reader->msgq_enqueue(m);
00071     printf("Sent with message ID %u\n", msgid);
00072 
00073     if ( ti_writer->msgq_size() > 1 ) {
00074       cout << "Error, more than one message! flushing." << endl;
00075       ti_writer->msgq_flush();
00076     }
00077 
00078     usleep(100000);
00079 
00080     if ( ti_writer->msgq_first() != NULL ) {
00081       if ( ti_writer->msgq_first_is<TestInterface::SetTestStringMessage>() ) {
00082         TestInterface::SetTestStringMessage *msg = ti_writer->msgq_first(msg);
00083         printf("Received message of ID %u, Message improperly detected to be a SetTestStringMessage\n", msg->id());
00084       }
00085       if ( ti_writer->msgq_first_is<TestInterface::SetTestIntMessage>() ) {
00086         TestInterface::SetTestIntMessage *m2 = ti_writer->msgq_first<TestInterface::SetTestIntMessage>();
00087         printf("Received message with ID %u (enqueue time: %s)\n", m2->id(), m2->time_enqueued()->str());
00088         ti_writer->set_test_int( m2->test_int() );
00089         try {
00090           ti_writer->write();
00091         } catch (InterfaceWriteDeniedException &e) {
00092           cout << "BUG: caught write denied exception" << endl;
00093           e.print_trace();
00094         }
00095         ti_writer->msgq_pop();
00096       } else {
00097         cout << "Illegal message '" << ti_writer->msgq_first()->type() << "' type received" << endl;
00098       }
00099 
00100       usleep(100000);
00101 
00102       //cout << "Reading value from reader interface.. " << flush;
00103       ti_reader->read();
00104       int val = ti_reader->test_int();
00105       if ( val == expval ) {
00106         //cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
00107       } else {
00108         cout << " failure, value is " << ti_reader->test_int() << ", expected "
00109              << expval << endl;
00110       }
00111     } else {
00112       printf("No message in queue, if network test this means the message was dropped\n");
00113     }
00114 
00115     usleep(10);
00116   }
00117 }
00118 
00119 class SyncInterfaceListener : public fawkes::BlackBoardInterfaceListener
00120 {
00121 public:
00122   SyncInterfaceListener(fawkes::Interface *reader,
00123                         fawkes::Interface *writer,
00124                         fawkes::BlackBoard *reader_bb,
00125                         fawkes::BlackBoard *writer_bb)
00126     : BlackBoardInterfaceListener("SyncInterfaceListener(%s-%s)", writer->uid(), reader->id())
00127   {
00128     __reader    = reader;
00129     __writer    = writer;
00130     __reader_bb = reader_bb;
00131     __writer_bb = writer_bb;
00132 
00133     bbil_add_data_interface(__reader);
00134     bbil_add_message_interface(__writer);
00135 
00136     __reader_bb->register_listener(this, BlackBoard::BBIL_FLAG_DATA);
00137     __writer_bb->register_listener(this, BlackBoard::BBIL_FLAG_MESSAGES);
00138   }
00139 
00140 
00141   /** Destructor. */
00142   ~SyncInterfaceListener()
00143   {
00144     __reader_bb->unregister_listener(this);
00145     __writer_bb->unregister_listener(this);
00146   }
00147 
00148 
00149   bool
00150   bb_interface_message_received(Interface *interface,
00151                                 Message *message) throw()
00152   {
00153     try {
00154       if ( interface == __writer ) {
00155         printf("%s: Forwarding message\n", bbil_name());
00156         Message *m = message->clone();
00157         m->set_hops(message->hops());
00158         m->ref();
00159         __reader->msgq_enqueue(m);
00160         message->set_id(m->id());
00161         m->unref();
00162         return false;
00163       } else {
00164         // Don't know why we were called, let 'em enqueue
00165         printf("%s: Message received for unknown interface\n", bbil_name());
00166         return true;
00167       }
00168     } catch (Exception &e) {
00169       printf("%s: Exception when message received\n", bbil_name());
00170       e.print_trace();
00171       return false;
00172     }
00173   }
00174 
00175 
00176   void
00177   bb_interface_data_changed(Interface *interface) throw()
00178   {
00179     try {
00180       if ( interface == __reader ) {
00181         //__logger->log_debug(bbil_name(), "Copying data");
00182         __reader->read();
00183         __writer->copy_values(__reader);
00184         __writer->write();
00185       } else {
00186         // Don't know why we were called, let 'em enqueue
00187         printf("%s: Data changed for unknown interface", bbil_name());
00188       }
00189     } catch (Exception &e) {
00190       printf("%s: Exception when data changed\n", bbil_name());
00191       e.print_trace();
00192     }
00193   }
00194 
00195  private:
00196   fawkes::Interface  *__writer;
00197   fawkes::Interface  *__reader;
00198 
00199   fawkes::BlackBoard *__writer_bb;
00200   fawkes::BlackBoard *__reader_bb;
00201 
00202 };
00203 
00204 
00205 int
00206 main(int argc, char **argv)
00207 {
00208   signal(SIGINT, signal_handler);
00209 
00210   LocalBlackBoard *llbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
00211   BlackBoard *lbb = llbb;
00212 
00213   FawkesNetworkServerThread  *fns = new FawkesNetworkServerThread(1910);
00214   fns->start();
00215 
00216   llbb->start_nethandler(fns);
00217 
00218   BlackBoard *rbb = new RemoteBlackBoard("localhost", 1910);
00219 
00220   InterfaceInfoList *infl = rbb->list_all();
00221   for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00222     const unsigned char *hash = (*i).hash();
00223     char phash[__INTERFACE_HASH_SIZE * 2 + 1];
00224     memset(phash, 0, sizeof(phash));
00225     for (unsigned int j = 0; j < __INTERFACE_HASH_SIZE; ++j) {
00226       sprintf(&phash[j * 2], "%02x", hash[j]);
00227     }
00228     printf("%s::%s (%s), w:%i  r:%u  s:%u\n",
00229            (*i).type(), (*i).id(), phash, (*i).has_writer(),
00230            (*i).num_readers(), (*i).serial());
00231   }
00232   delete infl;
00233 
00234   //TestInterface *ti_writer;
00235   TestInterface *ti_reader;
00236   TestInterface *ti_writer;
00237   try {
00238     cout << "Opening interfaces.. " << flush;
00239     ti_writer = rbb->open_for_writing<TestInterface>("SomeID");
00240     ti_reader = rbb->open_for_reading<TestInterface>("SomeID");
00241     cout << "success, "
00242          << "writer hash=" << ti_writer->hash_printable()
00243          << "  reader hash=" << ti_reader->hash_printable()
00244          << endl;
00245   } catch (Exception &e) {
00246     cout << "failed! Aborting" << endl;
00247     e.print_trace();
00248     exit(1);
00249   }
00250 
00251   try {
00252     cout << "Trying to open second writer.. " << flush;
00253     TestInterface *ti_writer_two;
00254     ti_writer_two = rbb->open_for_writing<TestInterface>("SomeID");
00255     cout << "BUG: Detection of second writer did NOT work!" << endl;
00256     exit(2);
00257   } catch (BlackBoardWriterActiveException &e) {
00258     cout << "exception caught as expected, detected and prevented second writer!" << endl;
00259   }
00260 
00261   try {
00262     cout << "Trying to open third writer.. " << flush;
00263     TestInterface *ti_writer_three;
00264     ti_writer_three = rbb->open_for_writing<TestInterface>("AnotherID");
00265     cout << "No exception as expected, different ID ok!" << endl;
00266     rbb->close(ti_writer_three);
00267   } catch (BlackBoardWriterActiveException &e) {
00268     cout << "BUG: Third writer with different ID detected as another writer!" << endl;
00269     exit(3);
00270   }
00271 
00272   cout << endl << endl
00273        << "Running data tests ==================================================" << endl;
00274 
00275   cout << "Writing initial value ("
00276        << TestInterface::TEST_CONSTANT << ") into interface as TestInt" << endl;
00277   ti_writer->set_test_int( TestInterface::TEST_CONSTANT );
00278   try {
00279     ti_writer->write();
00280   } catch (InterfaceWriteDeniedException &e) {
00281     cout << "BUG: caught write denied exception" << endl;
00282     e.print_trace();
00283   }
00284 
00285   cout << "Giving some time to have value processed" << endl;
00286   usleep(100000);
00287 
00288   cout << "Reading value from reader interface.. " << flush;
00289   ti_reader->read();
00290   int val = ti_reader->test_int();
00291   if ( val == TestInterface::TEST_CONSTANT ) {
00292     cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
00293   } else {
00294     cout << " failure, value is " << ti_reader->test_int() << ", expected "
00295          << TestInterface::TEST_CONSTANT << endl;
00296   }
00297 
00298   cout << "Closing interfaces.. " << flush;
00299   try {
00300     rbb->close(ti_reader);
00301     rbb->close(ti_writer);
00302     cout << "done" << endl;
00303   } catch (Exception &e) {
00304     cout << "failed" << endl;
00305     e.print_trace();
00306   }
00307 
00308   cout << endl << endl << "Starting MESSAGING tests" << endl 
00309        << "Press Ctrl-C to continue with next test" << endl << endl;
00310 
00311   ti_writer = lbb->open_for_writing<TestInterface>("Messaging");
00312   ti_reader = rbb->open_for_reading<TestInterface>("Messaging");
00313 
00314   printf("Writer serial: %u  shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
00315   printf("Reader serial: %u  shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
00316 
00317   test_messaging(ti_reader, ti_writer);
00318 
00319   rbb->close(ti_reader);
00320   lbb->close(ti_writer);
00321 
00322   cout << endl << endl << "Starting MESSAGING tests, doing repeater scenario" << endl 
00323        << "Press Ctrl-C to continue with next test" << endl << endl;
00324   quit = false;
00325 
00326   delete rbb;
00327 
00328   LocalBlackBoard *repllbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
00329 
00330   FawkesNetworkServerThread  *repfns = new FawkesNetworkServerThread(1911);
00331   repfns->start();
00332 
00333   repllbb->start_nethandler(repfns);
00334 
00335   BlackBoard *rep_rbb = new RemoteBlackBoard("localhost", 1911);
00336   rbb = new RemoteBlackBoard("localhost", 1911);
00337 
00338   TestInterface *rep_reader;
00339   TestInterface *rep_writer;
00340 
00341   ti_writer = rbb->open_for_writing<TestInterface>("Messaging");
00342   ti_reader = lbb->open_for_reading<TestInterface>("Messaging");
00343 
00344   rep_reader = rep_rbb->open_for_reading<TestInterface>("Messaging");
00345   rep_writer = lbb->open_for_writing<TestInterface>("Messaging");
00346 
00347   printf("Writer serial: %u  shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
00348   printf("Reader serial: %u  shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
00349 
00350   SyncInterfaceListener *sil = new SyncInterfaceListener(rep_reader, rep_writer, rep_rbb, lbb);
00351 
00352   test_messaging(ti_reader, ti_writer);
00353 
00354   delete sil;
00355   lbb->close(ti_reader);
00356   rbb->close(ti_writer);
00357   rep_rbb->close(rep_reader);
00358   lbb->close(rep_writer);
00359   delete repllbb;
00360   delete rep_rbb;
00361 
00362   cout << "Tests done" << endl;
00363 
00364   delete rbb;
00365   delete llbb;
00366   delete fns;
00367 }
00368 
00369 
00370 /// @endcond