Archived

This forum has been archived. Please start a new discussion on GitHub.

IceStorm Thread

Hey I'm checking the IceStorm clock demo, and I'm just curious about the Thread in IceStorm

In Subscribe.java, there are 4 lines of code that I'm interested in. These are the code:
adapter.activate();
shutdownOnInterrupt();
communicator().waitForShutdown();
topic.unsubscribe(subscriber);

And my question is, I can't do anything in my main method after I instantiate the Subscribe class.
Subscriber app = new Subscriber();
int status = app.main("Subscriber", args, "config.sub");

// this won't happen
while(true)
{

}

I'm guessing that's because of the communicator().waitForShutdown();
Is there any way that I could subscribe to a topic and at the same time I'm still able to do any operation?

Comments

  • xdm
    xdm La Coruña, Spain
    Hi,

    Ice.Application.main doesn't return until run method exists, and when this happens Ice.Application destroy the communicator and associated resources.

    The call to waitForShutdown is there so the server doesn't terminate.

    Depending of what you want to do you can put your code inside run before the call to waitForShutodown or don't use Ice.Application, see demo/Ice/minimal for an example of an application that doesn't use Ice.Application.

    Can you please also update your project description "IceStorm" is not a project.
  • Thanks Jose for you suggestion. I've change the code and I run into another problem.

    This is what I have for the Subscribe class
    String a[];
    	
    	public EnergyUser(String args[]){
    		a=args;
    	}
    	
    
    
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		String topicName = "Energy";
            String option = "None";
            boolean batch = false;
            String id = null;
            String retryCount = null;
            int i;
           
            Ice.Communicator communicator = Ice.Util.initialize(a);
    
            IceStorm.TopicManagerPrx manager = IceStorm.TopicManagerPrxHelper.checkedCast(
                communicator.stringToProxy("DemoIceStorm/TopicManager:default -h localhost -p 10000"));
            if(manager == null)
            {
                System.err.println("invalid proxy");
                
            }
    
            //
            // Retrieve the topic.
            //
            IceStorm.TopicPrx topic = null;
            try
            {
                topic = manager.retrieve(topicName);
            }
            catch(IceStorm.NoSuchTopic e)
            {
                try
                {
                    topic = manager.create(topicName);
                }
                catch(IceStorm.TopicExists ex)
                {
                    System.err.println(Ice.Application.appName() + ": temporary failure, try again.");
                    
                }
            }
    
            Ice.ObjectAdapter adapter = communicator.createObjectAdapterWithEndpoints("Energy.Subscriber.Endpoints", "tcp -h localhost -p 10002");
    
            
            //
            Ice.Identity subId = new Ice.Identity(id, "");
            if(subId.name == null)
            {
                subId.name = java.util.UUID.randomUUID().toString();
            }
            Ice.ObjectPrx subscriber = adapter.add(new EnergyUserI(), subId);
    
            java.util.Map<String, String> qos = new java.util.HashMap<String, String>();
            if(retryCount != null)
            {
                qos.put("retryCount", retryCount);
            }
            
           
            try
            {
                topic.subscribeAndGetPublisher(qos, subscriber);
            }
            catch(IceStorm.AlreadySubscribed e)
            {
                
                if(id == null)
                {
                    e.printStackTrace();
                    
                }
                else
                {
                    System.out.println("reactivating persistent subscriber");
                }
            }
            catch(IceStorm.BadQoS e)
            {
                e.printStackTrace();
                
            }
            adapter.activate();
    
            Ice.Application.shutdownOnInterrupt();
            communicator.waitForShutdown();
            topic.unsubscribe(subscriber);
            communicator.destroy();
            
    		
    	}
    

    However when I tried to stop the program from eclipse, I got eclipse error. It says "Terminate failed". And another thing, when I tried to run the code again, it always give me connection refused error. That's because the port I used has not been free yet. I've user communicator.destroy() on the bottom of my code to free all the connection but somehow it won't work.
  • xdm
    xdm La Coruña, Spain
    You must not use Ice.Application.shutdownOnInterrupt if you are not using Ice.Application.

    To gracefully shutdown the server you will need to add a shutdown hook of your own, it should be something along those lines.
    adapter.activate();
    final Thread thread = Thread.currentThread();
    Runtime.getRuntime().addShutdownHook(new Thread()
    {
        public void run()
        {
            communicator.shutdown();
            while(true)
            {
                try
                {
                    thread.join();
                    break;
                }
                catch(InterruptedException ex)
                {
                }
            }
        }
    });
    communicator.waitForShutdown();
    communicator.destroy();
    
  • Thanks a lot Jose, I got everything working fine