Archived

This forum has been archived. Please start a new discussion on GitHub.

Publish/Subscribe issue

I'm using 3 different ways to subscribe to topics (or records, as we call them), with varying degrees of success.

I will call the 3 ways, Mode A, B and C.

Total success would be to get snapshot and updates to all subscribers. I'm not getting total success with any of the modes A, B or C.

The results matrix is as follows:

Mode IceStorm Publisher subscriber Subscriber result
A ldn-analyt-009 ldn-analyt-29 ldn-analyt-29 snapshot + update
A ldn-analyt-009 ldn-analyt-29 ldn-analyt-009 nothing
A ldn-analyt-009 ldn-analyt-009 ldn-analyt-29 nothing
A ldn-analyt-009 ldn-analyt-009 ldn-analyt-009 snapshot + update
B ldn-analyt-009 ldn-analyt-29 ldn-analyt-29 update
B ldn-analyt-009 ldn-analyt-29 ldn-analyt-009 update
B ldn-analyt-009 ldn-analyt-009 ldn-analyt-29 update
B ldn-analyt-009 ldn-analyt-009 ldn-analyt-009 update
C ldn-analyt-009 ldn-analyt-29 ldn-analyt-29 nothing
C ldn-analyt-009 ldn-analyt-29 ldn-analyt-009 nothing
C ldn-analyt-009 ldn-analyt-009 ldn-analyt-29 nothing
C ldn-analyt-009 ldn-analyt-009 ldn-analyt-009 nothing


The subscriber code for the three modes is as follows:
bool CSubscriber::RequestRecord(std::string strRecordName, CListenerIce* pListener)
{
	try
	{
		ValueObserverPrx observerValue;
		auto observerIter = m_Topics.find(strRecordName);
		if (observerIter == m_Topics.end())
		{
			auto pListeners = std::make_shared<std::set<CListenerIce*>>();
			pListeners->insert(pListener);
			observerValue = ValueObserverPrx::uncheckedCast(m_AdapterPtr->addWithUUID(new ValueObserverI(pListeners)));
			IceStorm::TopicPrx topic = ValueRecordI::getTopic(m_TopicManager, strRecordName, false);

			// Mode A
			//auto ValueProxy = ValueRecordPrx::uncheckedCast(m_CommunicatorIce->stringToProxy(strRecordName));
			//ValueProxy->subscribe(observerValue);
			// end Mode A

			// Mode B
			//auto ValueProxy = ValueRecordPrx::uncheckedCast(topic->subscribeAndGetPublisher(IceStorm::QoS(), observerValue));
			// end Mode B

			// Mode C
			auto ValueProxy = ValueRecordPrx::uncheckedCast(topic->getPublisher());
			ValueProxy->subscribe(observerValue);
			// end Mode C

			m_Topics[strRecordName] = make_tuple(observerValue, pListeners);
		}
		else
		{
			observerValue = get<OBSERVER>(observerIter->second);
			auto& listeners = get<LISTENERS>(observerIter->second);
			ASSERT(listeners->find(pListener) == listeners->end());
			listeners->insert(pListener);
			CLogger::Instance().Log(eReutersFeed, "%s : already subscribed else where, use old observer", strRecordName.c_str());
			auto pValue = observerValue->GetValue();
			auto pCValue = ValueObserverI::GetValue(pValue);
			pListener->HandleRecordUpdate(pCValue, true);
		}

		return true;
	}
	catch (IceUtil::Exception& ex)
	{
		CLogger::Instance().Log(eReutersFeed, "%s, %s : subscribe failed", ex.what(), strRecordName.c_str());
		return false;
	}

	return false;
}

Mode C is enabled in this code snippet. Modes A and B are commented out.

I suspect that Mode A works only when the publisher and subscriber are on the same machine because both publisher and subscriber are in the same application and share the same ICE::Communicator.

Mode B works as expected, not delivering the snapshot.

Mode C is the one I would expect to work for me but the subscribe is not being called in the line
ValueProxy->subscribe(observerValue);

So the problem seems to be related to the proxy ValueRecordPrx not being able to call the subscribe function in mode C.
If this is the case, what is the best way to get a valid ValueRecordPrx proxy for calling its subscribe function?

The subscribe function is as follows:
void ValueRecordI::subscribe(const ValueObserverPrx& observer, const Ice::Current&)
{
	Lock sync(*this);

	ValueObserverPrx ObserverPrx = ValueObserverPrx::uncheckedCast(
		_topic->subscribeAndGetPublisher(IceStorm::QoS(), observer));
	ObserverPrx->SetValue(_value);
}

The ice file is as follows:
#pragma once

module PubSubIce
{

class Value
{
	string Serialise();
};

class ValueDate extends Value
{
	double mDate;
};

class ValueString extends Value
{
	string mString;
};

class ValueNumber extends Value
{
	double mNumber;
};

sequence<Value> ValueVec;

class ValueMatrix extends Value
{
	int mCols;
	int mRows;

	ValueVec mValueVec;
};

class ValueCurveImpl extends Value
{
	ValueDate mStartDate;
	ValueMatrix mPoints;
	ValueMatrix mAdjustments;
};


interface ValueObserver
{
	void SetValue(Value value1);
	Value GetValue();
};

interface ValueRecord
{
    void subscribe(ValueObserver* observer);
    void unsubscribe(ValueObserver* observer);

	void publish(Value value1);
};


};

Comments

  • Some configuration files

    Server config
    #
    # The endpoints for the value object.
    #
    Value.Endpoints=default -h ldn-analyt-29 -p 12000
    
    #
    # This property is used by the value server to connect with IceStorm.
    #
    TopicManager.Proxy=DemoIceStorm/TopicManager:default -h ldn-analyt-009 -p 10000
    
    #
    # Network Tracing
    #
    # 0 = no network tracing
    # 1 = trace connection establishment and closure
    # 2 = like 1, but more detailed
    # 3 = like 2, but also trace data transfer
    #
    #Ice.Trace.Network=1
    
    #
    # This property is used by the clients to connect to IceStorm.
    #
    Value.Proxy=value:default -h ldn-analyt-29 -p 12000
    


    Icebox
    #
    # The IceBox server endpoint configuration
    #
    IceBox.ServiceManager.Endpoints=tcp -h ldn-analyt-009 -p 9998
    
    #
    # The IceStorm service
    #
    IceBox.Service.IceStorm=IceStormService,35:createIceStorm --Ice.Config=config.service
    
    #
    # Warn about connection exceptions
    #
    Ice.Warn.Connections=1
    
    #
    # Network Tracing
    #
    # 0 = no network tracing
    # 1 = trace connection establishment and closure
    # 2 = like 1, but more detailed
    # 3 = like 2, but also trace data transfer
    #
    Ice.Trace.Network=3
    
    #
    # Protocol Tracing
    #
    # 0 = no protocol tracing
    # 1 = trace protocol messages
    #
    Ice.Trace.Protocol=1
    

    config.service
    #
    # The IceStorm service instance name.
    #
    IceStorm.InstanceName=DemoIceStorm
    
    #
    # This property defines the endpoints on which the IceStorm
    # TopicManager listens.
    #
    IceStorm.TopicManager.Endpoints=default -h ldn-analyt-009 -p 10000
    #
    # This property defines the endpoints on which the topic
    # publisher objects listen.
    #
    IceStorm.Publish.Endpoints=default -h ldn-analyt-009
    
    #
    # TopicManager Tracing
    #
    # 0 = no tracing
    # 1 = trace topic creation, subscription, unsubscription
    # 2 = like 1, but with more detailed subscription information
    #
    IceStorm.Trace.TopicManager=2
    
    #
    # Topic Tracing
    #
    # 0 = no tracing
    # 1 = trace unsubscription diagnostics
    # 2 = like 1, but with more detailed subscription information
    #
    IceStorm.Trace.Topic=2
    
    #
    # Subscriber Tracing
    #
    # 0 = no tracing
    # 1 = subscriber diagnostics (subscription, unsubscription, event
    #     propagation failures)
    # 2 = like 1, but with more detailed subscription information
    #
    IceStorm.Trace.Subscriber=2
    
    #
    # Amount of time in milliseconds between flushes for batch mode
    # transfer. The minimum allowable value is 100ms.
    #
    IceStorm.Flush.Timeout=2000
    
    #
    # Network Tracing
    #
    # 0 = no network tracing
    # 1 = trace connection establishment and closure
    # 2 = like 1, but more detailed
    # 3 = like 2, but also trace data transfer
    #
    Ice.Trace.Network=3
    
    #
    # This property defines the home directory of the Freeze 
    # database environment for the IceStorm service.
    #
    Freeze.DbEnv.IceStorm.DbHome=db
    
  • xdm
    xdm La Coruña, Spain
    Hi,

    Is not clear what is your goal, can you clarify what are you trying to do?


    As for your code:

    In case A you call the subscribe method of your object using a proxy, that in turn call the subscribe method of the topic, this will work if the proxy denoted by "strRecordName" is valid. Note that here the ValueProxy proxy is not a proxy to the publisher object so you cannot use this proxy to public messages to the topic.

    In case B you subscribe to the topic and get a publisher proxy for that topic.

    in case C, you publish a message 'subscribe' to a topic, all subscribers of the topic will receive this message, and in your implementation you subscribe again to the topic, this doesn't make sense to me.
  • My Goal

    Thanks for getting back to me so quickly.

    I'm sorry my goal is un clear, I'll try to clarify.

    We have an in house application for calculating prices of financial instruments. This application looks very much like a spreadsheet application. Cells display values calculated by it's formula. These values can be arbitrarily complex data structures.

    The goal is to be able to publish any cells values, via a new cell function that I am now developing, i.e. a publish cell function taking a value and a topic name. I also need a request cell function which subscribes to the desired value, or topic. When the published value changes, the requester requires the updated value.

    So any cell in the spreadsheet can do a named publish, via the new cell publish cell function.
    Also any cell can request a published value using the request cell function and the desired name for the value required. Typically the request is made in a different instance of the application and on a different computer than the publish.

    The CListenerIce* pListener in the code represents the listener to a cell, and handles the updating of the cell when the publisher does an update of its value and the subscriber gets the update.

    With all other code unchanged and only implementing one of the code fragments designated as Mode A, B or C, I get the results as mentioned. nothing represents no data received by the requester, update represents updates only and no initial snapshot, snapshot + update is my goal.

    I think from your reply, although I understand that you were unclear about my intentions, Mode A is what I need to focus on to solve my problem.

    My problem may be due to not understanding the difference in the ValueProxy of Mode A and Mode C. I assumed they should be the same proxy. In Mode A the subscribe function is called, but in Mode C it is not.

    Mode A works perfectly only if the publisher and the subscriber are cells in the same instance of the application, regardless of whether IceStorm is running on a different computer or not.
    As soon as the publisher and subscriber are on different instances of the application on different computers, the subscriber gets no snapshot or updates.

    I am instantiating an Ice::Communicator and passing this communicator to both the Publisher class and the subscriber class, is this valid or do they need their own instances of a communicator.

    Cheers
    Gordon
  • xdm
    xdm La Coruña, Spain
    Hi,

    In A ValueProxy is a proxy that points to a ValueRecord servant, if you invoke a method in this proxy the message is dispatched to this servant.

    In C ValueProxy is a proxy that points to the topic publisher object, when you invoke a method in this proxy you send an message to IceStorm, and the message will be deliver to all the subscribers of this topic.

    So even you use a proxy of the same type in A and C, your are communicating with different objects, in A you are directly sending a message to a ValueRecord servant, and in C you are sending a message to an IceStorm topic.

    You don't really want C, as for why A is failing some times, seems like a configuration issue, you should start by enabling Network tracing and check the messages are sent to the expected address.
  • Hi,

    I've got all tracing on for Icebox and IceStorm.

    I'm not sure how to capture the trace messages in my application as it is not a console application. I have a logging class which accepts std::string but not sure I can use this to log ICE trace messages. Can they be easily redirected to a file?

    This looks to be the point of failure.
    An exception is thrown when I call subscribe in the code
    auto ValueProxy = ValueRecordPrx::uncheckedCast(m_CommunicatorIce->stringToProxy(strRecordName));
    ValueProxy->subscribe(observerValue);


    The exception is
    Ice::NoEndpointException:
    no suitable endpoint available for proxy `value -t -e 1.1'

    In this case the topic is 'value'. I'm not sure why the exception reports ' -t -e 1.1' appended to the topic name, could this be a problem or is it normal?

    This does not happen when the subscriber and publisher are in the same application and sharing an ICE::Communicator.
  • xdm
    xdm La Coruña, Spain
    You can redirect the logger output to a file using Ice.LogFile property.

    As for the NoEndpointException seems your proxy doesn't contains any enpoints, this works when the communicator is shared because of collocation optimization.

    You should include the server endpoints when you create the proxy, something like this should work:
    auto ValueProxy = ValueRecordPrx::uncheckedCast(
        m_CommunicatorIce->stringToProxy(strRecordName + ":default -h ldn-analyt-29 -p 12000"));