111 const std::string& message) :
119 memcpy(this->
buffer +
sizeof(JPrefix_t), message.data(), message.size());
130 static_cast<JPrefix_t&
>(*this) = message;
438 queue.push_back(message);
452 for (std::deque<JDispatch>::iterator i =
queue.begin(); i !=
queue.end(); ) {
496 for (
iterator i = this->begin(); i != this->end(); ++i) {
507 for (
iterator i = this->begin(); i != this->end(); ++i) {
523 return out <<
"(" << message.
getTag() <<
"," << message.
size() <<
")";
536 return out <<
"[" <<
socket.getFileDescriptor() <<
"]";
604 catch(
const exception &error) {
615 DEBUG(
"Memory limit " <<
setw(10) << JDispatch::MEMORY_LIMIT <<
endl);
616 DEBUG(
"Queue limit " <<
setw(10) << JClient::QUEUE_LIMIT <<
endl);
624 for (JClientList::iterator
client = clientList.begin();
client != clientList.end(); ++
client) {
626 if (!
client->in.isReady()) {
630 if (
client->out.isReset()) {
632 if (!
client->queue.empty()) {
637 client->decrementRequest();
642 }
else if (
client->out.isBusy()) {
651 nfds = select(timeout_us);
653 catch(
const exception& error) {
659 for (JClientList::iterator
client = clientList.begin();
client != clientList.end(); ) {
668 catch(
const exception& error) {
670 ERROR(
"Remove (3) client" << *
client <<
"<" <<
client->getNickname() <<
">: " << error.what() <<
endl);
672 if (
client->getNickname() !=
"") {
686 if (
client->in.isReady()) {
690 bool special = JControlHost::maybe_special(
client->in.prefix);
696 if (
client->in.prefix.getTag() == DISPTAG_Subscribe) {
698 client->setSubscription(
string(
client->in.getRemainingData(),
client->in.getRemainingSize()));
700 }
else if (
client->in.prefix.getTag() == DISPTAG_MyId) {
702 client->setNickname(
string(
client->in.getRemainingData(),
client->in.getRemainingSize()));
706 }
else if (
client->in.prefix.getTag() == DISPTAG_Gime) {
708 client->incrementRequest();
710 }
else if (
client->in.prefix.getTag() == DISPTAG_Always) {
714 }
else if (
client->in.prefix.getTag() == DISPTAG_WhereIs) {
716 string nick_name(
client->in.getRemainingData(),
client->in.getRemainingSize());
719 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
720 if (i->getNickname() == nick_name) {
721 buffer +=
" " + i->getHostname();
727 socket.PutFullString(DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1));
737 }
else if (
client->in.prefix.getTag() == DISPTAG_ShowStat) {
743 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
747 for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
748 total += message->size();
751 cout <<
"client[" << i->getFileDescriptor() <<
"] " << i->getNickname() <<
endl;
752 cout <<
"tag - all:";
757 cout <<
"tag - any:";
762 cout <<
"queue " << i->queue.size() <<
' ' << total <<
"B" <<
endl;
767 }
else if (
client->in.prefix.getTag() == DISPTAG_Debug) {
783 if (JDispatch::MEMORY_TOTAL > JDispatch::MEMORY_LIMIT) {
785 WARNING(
"Memory " <<
setw(10) << JDispatch::MEMORY_TOTAL <<
" > " <<
setw(10) << JDispatch::MEMORY_LIMIT <<
endl);
800 if (
client->out.isReady()) {
802 client->queue.pop_front();
808 catch(
const exception& error) {
810 DEBUG(
"Remove (2) client" << *
client <<
"<" <<
client->getNickname() <<
">: " << error.what() <<
endl);
812 if (
client->getNickname() !=
"") {
829 socket.setReuseAddress(
true);
830 socket.setKeepAlive (
true);
831 socket.setNonBlocking (
true);
int main(int argc, char *argv[])
Base class for memory management.
General purpose messaging.
#define DEBUG(A)
Message macros.
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
int getFileDescriptor() const
Get file descriptor.
Exception for ControlHost.
Exception for failure of memory allocation.
Memory management class for create/release policy based on malloc()/free().
static T * create()
Create object in memory.
static void release(T *p)
Release memory.
void initialise()
Initialise counter.
void attach(const JSharedCounter &object)
Attach this counter to given shared counter object.
List of ControlHost client managers.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
JClientList()
Default constructor.
ControlHost client manager.
std::deque< JDispatch > queue
queue for outgoing messages
void setRequestAll()
Set no request.
const std::set< JTag > & getSubscriptionAny() const
Get subscription.
const std::string & getNickname() const
Get nick name.
std::set< JTag > subscriptionAll
bool checkSubscriptionAll(const JPrefix_t &prefix) const
Check subscription for given prefix.
bool checkSubscription(const JPrefix_t &prefix) const
Check subscription for given prefix.
static unsigned int QUEUE_LIMIT
Maximum number of messages in queue.
JSocketNonblockingWriter out
writer for outgoing messages
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
void drop()
Drop all messages for which the client has not the 'all' subscription.
bool checkRequest() const
Check request.
JClient()
Default constructor.
std::set< JTag > subscriptionAny
JClient(const JTCPSocket &socket)
Constructor.
void setNickname(const std::string &nick_name)
Set nick name.
void incrementRequest()
Increment request by one.
const std::set< JTag > & getSubscriptionAll() const
Get subscription.
void decrementRequest()
Decrement request by one.
bool setSubscription(const std::string &subscription)
Set subcription.
JSocketInputChannel_t in
reader for incoming messages
bool checkSubscriptionAny(const JPrefix_t &prefix) const
Check subscription for given prefix.
Data structure of a ControlHost message.
JDispatch(const JDispatch &message)
Copy constructor.
int size() const
Get size.
void create()
Allocate memory.
JMalloc< char > JMemory_t
void release()
Release memory.
static long long int MEMORY_TOTAL
Total size of data [Bytes].
JDispatch(const JPrefix_t &prefix, const char *data)
Constructor.
static long long int MEMORY_LIMIT
Limit size of data [Bytes].
JDispatch(const JTag &tag, const std::string &message)
Constructor.
JDispatch()
Default constructor.
JDispatch & operator=(const JDispatch &message)
Assignment operator.
const char * data() const
Get data.
int getSize() const
Get size.
void setSize(const long long int length)
Set size.
void setReaderMask(const JAbstractFile &file)
Set reader mask.
bool hasReaderMask(const JAbstractFile &file) const
Has reader file.
void setWriterMask(const JAbstractFile &file)
Set writer mask.
bool hasWriterMask(const JAbstractFile &file) const
Has writer file.
Wrapper class for select call.
Auxiliary class for non-blocking socket I/O.
int getSize() const
Get size of pending data.
Non-blocking socket writer.
Auxiliary class for non-blocking socket I/O.
int getCounter() const
Get number of I/O attempts.
bool isReady() const
Check ready status.
bool isBusy() const
Check busy status.
const JTag & getTag() const
Get tag.
Template definition of a multi-dimensional oscillation probability interpolation table.
JWriter & write(JWriter &out) const override final
Write from input.
JReader & read(JReader &in) override final
Read from input.
Utility class to parse command line options.
static const size_t buffer_size
std::ostream & operator<<(std::ostream &out, const morphology_type &object)
Write morphology to output stream.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
void setSizeOfPacket(const int size, JPrefix_t &prefix)
Set total size of internet packet.
JSocketBuffer< const char > JSocketOutputBuffer
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
unsigned long long int getRAM()
Get RAM of this CPU.