diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index ae16ece..70f3cf2 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -25,6 +25,7 @@ namespace IceInternal class BasicStream; class LocalExceptionWrapper; class Outgoing; +class RetryTask; class ICE_API OutgoingAsyncMessageCallback : virtual public IceUtil::Shared { @@ -49,6 +50,8 @@ public: protected: + friend class ::IceInternal::RetryTask; + void __acquireCallback(const Ice::ObjectPrx&); void __releaseCallback(const Ice::LocalException&); void __releaseCallback() @@ -83,6 +86,7 @@ public: void __finished(const Ice::LocalException&); void __finished(const LocalExceptionWrapper&); + void __retry(int); bool __send(); protected: diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index 8a1e5f4..b26a194 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -285,6 +286,19 @@ IceInternal::Instance::endpointHostResolver() return _endpointHostResolver; } +RetryQueuePtr +IceInternal::Instance::retryQueue() +{ + IceUtil::RecMutex::Lock sync(*this); + + if(_state == StateDestroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + + return _retryQueue; +} + IceUtil::TimerPtr IceInternal::Instance::timer() { @@ -991,6 +1005,8 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _servantFactoryManager = new ObjectFactoryManager(); _objectAdapterFactory = new ObjectAdapterFactory(this, communicator); + + _retryQueue = new RetryQueue(this); if(_initData.wstringConverter == 0) { @@ -1039,6 +1055,7 @@ IceInternal::Instance::~Instance() assert(!_serverThreadPool); assert(!_selectorThread); assert(!_endpointHostResolver); + assert(!_retryQueue); assert(!_timer); assert(!_routerManager); assert(!_locatorManager); @@ -1200,6 +1217,11 @@ IceInternal::Instance::destroy() _outgoingConnectionFactory->waitUntilFinished(); } + if(_retryQueue) + { + _retryQueue->destroy(); + } + ThreadPoolPtr serverThreadPool; ThreadPoolPtr clientThreadPool; SelectorThreadPtr selectorThread; @@ -1210,6 +1232,7 @@ IceInternal::Instance::destroy() _objectAdapterFactory = 0; _outgoingConnectionFactory = 0; + _retryQueue = 0; if(_connectionMonitor) { diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index f0aa50a..721b347 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -71,6 +72,7 @@ public: ThreadPoolPtr serverThreadPool(); SelectorThreadPtr selectorThread(); EndpointHostResolverPtr endpointHostResolver(); + RetryQueuePtr retryQueue(); IceUtil::TimerPtr timer(); EndpointFactoryManagerPtr endpointFactoryManager() const; DynamicLibraryListPtr dynamicLibraryList() const; @@ -134,6 +136,7 @@ private: ThreadPoolPtr _serverThreadPool; SelectorThreadPtr _selectorThread; EndpointHostResolverPtr _endpointHostResolver; + RetryQueuePtr _retryQueue; IceUtil::TimerPtr _timer; EndpointFactoryManagerPtr _endpointFactoryManager; DynamicLibraryListPtr _dynamicLibraryList; diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile index f236dd4..dbf931e 100644 --- a/cpp/src/Ice/Makefile +++ b/cpp/src/Ice/Makefile @@ -80,6 +80,7 @@ OBJS = Acceptor.o \ Proxy.o \ ReferenceFactory.o \ Reference.o \ + RetryQueue.o \ RequestHandler.o \ RouterInfo.o \ Router.o \ diff --git a/cpp/src/Ice/Makefile.mak b/cpp/src/Ice/Makefile.mak index 86800d9..314c3d3 100644 --- a/cpp/src/Ice/Makefile.mak +++ b/cpp/src/Ice/Makefile.mak @@ -81,6 +81,7 @@ OBJS = Acceptor.obj \ Proxy.obj \ ReferenceFactory.obj \ Reference.obj \ + RetryQueue.obj \ RequestHandler.obj \ RouterInfo.obj \ Router.obj \ diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 14da199..c866fe1 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -24,6 +24,7 @@ #include #include #include +#include using namespace std; using namespace Ice; @@ -454,6 +455,24 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& ex) } } +void +IceInternal::OutgoingAsync::__retry(int interval) +{ + // + // This method is called by the proxy to retry an invocation, no + // other threads can access this object. + // + if(interval > 0) + { + assert(__os); + __os->instance()->retryQueue()->add(this, interval); + } + else + { + __send(); + } +} + bool IceInternal::OutgoingAsync::__send() { @@ -466,11 +485,11 @@ IceInternal::OutgoingAsync::__send() } catch(const LocalExceptionWrapper& ex) { - handleException(ex); + handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously } catch(const Ice::LocalException& ex) { - handleException(ex); + handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously } return _sentSynchronously; } @@ -483,6 +502,7 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat _delegate = 0; _cnt = 0; _mode = mode; + _sentSynchronously = false; // // Can't call async via a batch proxy. diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp index 2fc5316..eeb554d 100644 --- a/cpp/src/Ice/ProxyFactory.cpp +++ b/cpp/src/Ice/ProxyFactory.cpp @@ -25,32 +25,8 @@ using namespace std; using namespace Ice; using namespace IceInternal; -namespace -{ - -class RetryTask : public IceUtil::TimerTask -{ -public: - - RetryTask(const OutgoingAsyncPtr& out) : _out(out) - { - } - - virtual void - runTimerTask() - { - _out->__send(); - } - -private: - - const OutgoingAsyncPtr _out; -}; - -} - IceUtil::Shared* IceInternal::upCast(ProxyFactory* p) { return p; } - + ObjectPrx IceInternal::ProxyFactory::stringToProxy(const string& str) const { @@ -243,34 +219,17 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, } out << " because of exception\n" << ex; } - - if(interval > 0) + + if(out) { - if(out) - { - try - { - _instance->timer()->schedule(new RetryTask(out), IceUtil::Time::milliSeconds(interval)); - } - catch(const IceUtil::IllegalArgumentException&) // Expected if the communicator destroyed the timer. - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - } - else - { - // - // Sleep before retrying. - // - IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); - } + out->__retry(interval); } - else + else if(interval > 0) { - if(out) - { - out->__send(); - } + // + // Sleep before retrying. + // + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); } } diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp new file mode 100644 index 0000000..d6aba62 --- /dev/null +++ b/cpp/src/Ice/RetryQueue.cpp @@ -0,0 +1,92 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include +#include +#include +#include + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; } + +IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const OutgoingAsyncPtr& outAsync) : + _queue(queue), _outAsync(outAsync) +{ +} + +void +IceInternal::RetryTask::runTimerTask() +{ + if(_queue->remove(this)) + { + try + { + _outAsync->__send(); + } + catch(const Ice::LocalException& ex) + { + _outAsync->__releaseCallback(ex); + } + } +} + +void +IceInternal::RetryTask::destroy() +{ + _outAsync->__releaseCallback(CommunicatorDestroyedException(__FILE__, __LINE__)); +} + +bool +IceInternal::RetryTask::operator<(const RetryTask& rhs) const +{ + return this < &rhs; +} + +IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(instance) +{ +} + +void +IceInternal::RetryQueue::add(const OutgoingAsyncPtr& out, int interval) +{ + Lock sync(*this); + RetryTaskPtr task = new RetryTask(this, out); + try + { + _instance->timer()->schedule(task, IceUtil::Time::milliSeconds(interval)); + } + catch(const IceUtil::IllegalArgumentException&) // Expected if the communicator destroyed the timer. + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + _requests.insert(task); +} + +void +IceInternal::RetryQueue::destroy() +{ + Lock sync(*this); + for(set::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + _instance->timer()->cancel(*p); + (*p)->destroy(); + } + _requests.clear(); +} + +bool +IceInternal::RetryQueue::remove(const RetryTaskPtr& task) +{ + Lock sync(*this); + return _requests.erase(task) > 0; +} + diff --git a/cpp/src/Ice/RetryQueue.h b/cpp/src/Ice/RetryQueue.h new file mode 100644 index 0000000..960b4a8 --- /dev/null +++ b/cpp/src/Ice/RetryQueue.h @@ -0,0 +1,62 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_RETRY_QUEUE_H +#define ICE_RETRY_QUEUE_H + +#include +#include +#include +#include +#include +#include + +namespace IceInternal +{ + +class RetryTask : public IceUtil::TimerTask +{ +public: + + RetryTask(const RetryQueuePtr&, const OutgoingAsyncPtr&); + + virtual void runTimerTask(); + void destroy(); + + bool operator<(const RetryTask&) const; + +private: + + const RetryQueuePtr _queue; + const OutgoingAsyncPtr _outAsync; +}; +typedef IceUtil::Handle RetryTaskPtr; + +class RetryQueue : public IceUtil::Shared, public IceUtil::Mutex +{ +public: + + RetryQueue(const InstancePtr&); + + void add(const OutgoingAsyncPtr&, int); + void destroy(); + +private: + + bool remove(const RetryTaskPtr&); + friend class RetryTask; + + const InstancePtr _instance; + std::set _requests; +}; + +} + +#endif + diff --git a/cpp/src/Ice/RetryQueueF.h b/cpp/src/Ice/RetryQueueF.h new file mode 100644 index 0000000..0e99fd7 --- /dev/null +++ b/cpp/src/Ice/RetryQueueF.h @@ -0,0 +1,24 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_RETRY_QUEUE_F_H +#define ICE_RETRY_QUEUE_F_H + +#include + +namespace IceInternal +{ + +class RetryQueue; +IceUtil::Shared* upCast(RetryQueue*); +typedef Handle RetryQueuePtr; + +} + +#endif diff --git a/cs/src/Ice/Instance.cs b/cs/src/Ice/Instance.cs index 2c9012e..de098da 100644 --- a/cs/src/Ice/Instance.cs +++ b/cs/src/Ice/Instance.cs @@ -216,6 +216,20 @@ namespace IceInternal } } + public RetryQueue + retryQueue() + { + lock(this) + { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + return _retryQueue; + } + } + public Timer timer() { @@ -742,7 +756,9 @@ namespace IceInternal _servantFactoryManager = new ObjectFactoryManager(); _objectAdapterFactory = new ObjectAdapterFactory(this, communicator); - + + _retryQueue = new RetryQueue(this); + string[] facetFilter = _initData.properties.getPropertyAsList("Ice.Admin.Facets"); if(facetFilter.Length > 0) { @@ -886,6 +902,11 @@ namespace IceInternal _outgoingConnectionFactory.waitUntilFinished(); } + if(_retryQueue != null) + { + _retryQueue.destroy(); + } + ThreadPool serverThreadPool = null; ThreadPool clientThreadPool = null; EndpointHostResolver endpointHostResolver = null; @@ -893,9 +914,9 @@ namespace IceInternal lock(this) { _objectAdapterFactory = null; - _outgoingConnectionFactory = null; - + _retryQueue = null; + if(_connectionMonitor != null) { _connectionMonitor.destroy(); @@ -1032,6 +1053,7 @@ namespace IceInternal private ThreadPool _serverThreadPool; private EndpointHostResolver _endpointHostResolver; private Timer _timer; + private RetryQueue _retryQueue; private bool _background; private EndpointFactoryManager _endpointFactoryManager; private Ice.PluginManager _pluginManager; diff --git a/cs/src/Ice/Makefile b/cs/src/Ice/Makefile index f8e1ff0..f8d27c5 100644 --- a/cs/src/Ice/Makefile +++ b/cs/src/Ice/Makefile @@ -76,6 +76,7 @@ SRCS = Acceptor.cs \ ReferenceFactory.cs \ ReplyStatus.cs \ RequestHandler.cs \ + RetryQueue.cs \ RouterInfo.cs \ ServantManager.cs \ Set.cs \ diff --git a/cs/src/Ice/Makefile.mak b/cs/src/Ice/Makefile.mak index a082e51..07ebcc0 100644 --- a/cs/src/Ice/Makefile.mak +++ b/cs/src/Ice/Makefile.mak @@ -76,6 +76,7 @@ SRCS = Acceptor.cs \ ReferenceFactory.cs \ ReplyStatus.cs \ RequestHandler.cs \ + RetryQueue.cs \ RouterInfo.cs \ ServantManager.cs \ Set.cs \ diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs index 516fc70..ea261d6 100644 --- a/cs/src/Ice/OutgoingAsync.cs +++ b/cs/src/Ice/OutgoingAsync.cs @@ -74,7 +74,7 @@ namespace IceInternal } } - protected void releaseCallback__(Ice.LocalException ex) + public void releaseCallback__(Ice.LocalException ex) { Debug.Assert(os__ != null); @@ -99,7 +99,7 @@ namespace IceInternal } } - protected void releaseCallback__() + protected void releaseCallback__() { lock(monitor__) { @@ -380,6 +380,23 @@ namespace IceInternal } } + public void retry__(int interval) + { + // + // This method is called by the proxy to retry an + // invocation. No other threads can access this object. + // + if(interval > 0) + { + Debug.Assert(os__ != null); + os__.instance().retryQueue().add(this, interval); + } + else + { + send__(); + } + } + public bool send__() { try @@ -409,7 +426,8 @@ namespace IceInternal _delegate = null; _cnt = 0; _mode = mode; - + _sentSynchronously = false; + // // Can't call async via a batch proxy. // diff --git a/cs/src/Ice/ProxyFactory.cs b/cs/src/Ice/ProxyFactory.cs index 034a1ca..26603ea 100644 --- a/cs/src/Ice/ProxyFactory.cs +++ b/cs/src/Ice/ProxyFactory.cs @@ -13,21 +13,6 @@ namespace IceInternal { public sealed class ProxyFactory { - private sealed class RetryTask : TimerTask - { - internal RetryTask(OutgoingAsync outAsync) - { - _outAsync = outAsync; - } - - public void runTimerTask() - { - _outAsync.send__(); - } - - private OutgoingAsync _outAsync; - } - public Ice.ObjectPrx stringToProxy(string str) { Reference r = instance_.referenceFactory().create(str, null); @@ -209,26 +194,16 @@ namespace IceInternal logger.trace(traceLevels.retryCat, s); } - if(interval > 0) + if(outAsync != null) { - if(outAsync != null) - { - instance_.timer().schedule(new RetryTask(outAsync), interval); - } - else - { - // - // Sleep before retrying. - // - System.Threading.Thread.Sleep(interval); - } + outAsync.retry__(interval); } - else + else if(interval > 0) { - if(outAsync != null) - { - outAsync.send__(); - } + // + // Sleep before retrying. + // + System.Threading.Thread.Sleep(interval); } } diff --git a/cs/src/Ice/RetryQueue.cs b/cs/src/Ice/RetryQueue.cs new file mode 100644 index 0000000..b5e6564 --- /dev/null +++ b/cs/src/Ice/RetryQueue.cs @@ -0,0 +1,90 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +namespace IceInternal +{ + using System.Collections.Generic; + + public class RetryTask : TimerTask + { + public RetryTask(RetryQueue retryQueue, OutgoingAsync outAsync) + { + _retryQueue = retryQueue; + _outAsync = outAsync; + } + + public void runTimerTask() + { + if(_retryQueue.remove(this)) + { + try + { + _outAsync.send__(); + } + catch(Ice.LocalException ex) + { + _outAsync.releaseCallback__(ex); + } + } + } + + public void destroy() + { + _outAsync.releaseCallback__(new Ice.CommunicatorDestroyedException()); + } + + private RetryQueue _retryQueue; + private OutgoingAsync _outAsync; + }; + + public class RetryQueue + { + public RetryQueue(Instance instance) + { + _instance = instance; + } + + public void add(OutgoingAsync outAsync, int interval) + { + lock(this) + { + RetryTask task = new RetryTask(this, outAsync); + _instance.timer().schedule(task, interval); + _requests.Add(task, null); + } + } + + public void + destroy() + { + lock(this) + { + foreach(RetryTask task in _requests.Keys) + { + _instance.timer().cancel(task); + task.destroy(); + } + _requests.Clear(); + } + } + + public bool + remove(RetryTask task) + { + lock(this) + { + return _requests.Remove(task); + } + } + + private Instance _instance; + private Dictionary _requests = new Dictionary(); + } +} + diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 574ded0..261adfd 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -908,7 +908,7 @@ public class ObjectPrxHelperBase implements ObjectPrx if(out != null) { - out.__send(cnt); + out.__send(); } return cnt; diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 1ee242b..2578f5b 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -201,6 +201,17 @@ public final class Instance return _endpointHostResolver; } + synchronized public RetryQueue + retryQueue() + { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + return _retryQueue; + } + synchronized public Timer timer() { @@ -713,6 +724,8 @@ public final class Instance _objectAdapterFactory = new ObjectAdapterFactory(this, communicator); + _retryQueue = new RetryQueue(this); + // // Add Process and Properties facets // @@ -753,6 +766,7 @@ public final class Instance IceUtilInternal.Assert.FinalizerAssert(_locatorManager == null); IceUtilInternal.Assert.FinalizerAssert(_endpointFactoryManager == null); IceUtilInternal.Assert.FinalizerAssert(_pluginManager == null); + IceUtilInternal.Assert.FinalizerAssert(_retryQueue == null); super.finalize(); } @@ -868,6 +882,11 @@ public final class Instance { _outgoingConnectionFactory.waitUntilFinished(); } + + if(_retryQueue != null) + { + _retryQueue.destroy(); + } ThreadPool serverThreadPool = null; ThreadPool clientThreadPool = null; @@ -877,8 +896,8 @@ public final class Instance synchronized(this) { _objectAdapterFactory = null; - _outgoingConnectionFactory = null; + _retryQueue = null; if(_connectionMonitor != null) { @@ -1054,6 +1073,7 @@ public final class Instance private ThreadPool _serverThreadPool; private SelectorThread _selectorThread; private EndpointHostResolver _endpointHostResolver; + private RetryQueue _retryQueue; private Timer _timer; private EndpointFactoryManager _endpointFactoryManager; private Ice.PluginManager _pluginManager; diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 9359fe4..12820c9 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -270,14 +270,22 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback } public final void - __send(int cnt) + __retry(int cnt, int interval) { // // This method is called by the proxy to retry an invocation. It's safe to update // the count here without synchronization, no other threads can access this object. // _cnt = cnt; - __send(); + if(interval > 0) + { + assert(__os != null); + __os.instance().retryQueue().add(this, interval); + } + else + { + __send(); + } } public final boolean @@ -292,11 +300,11 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback } catch(LocalExceptionWrapper ex) { - handleException(ex); + handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously } catch(Ice.LocalException ex) { - handleException(ex); + handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously } return _sentSynchronously; } @@ -310,6 +318,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback _delegate = null; _cnt = 0; _mode = mode; + _sentSynchronously = false; // // Can't call async via a batch proxy. diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java index ac05fe1..1f24976 100644 --- a/java/src/IceInternal/ProxyFactory.java +++ b/java/src/IceInternal/ProxyFactory.java @@ -132,7 +132,7 @@ public final class ProxyFactory if(out != null) { - out.__send(cnt); + out.__retry(cnt, 0); } return cnt; // We must always retry, so we don't look at the retry count. } @@ -205,42 +205,23 @@ public final class ProxyFactory logger.trace(traceLevels.retryCat, s); } - if(interval > 0) + if(out != null) { - if(out != null) - { - final int count = cnt; - _instance.timer().schedule(new TimerTask() - { - public void - runTimerTask() - { - out.__send(count); - } - }, interval); - } - else - { - // - // Sleep before retrying. - // - try - { - Thread.currentThread().sleep(interval); - } - catch(InterruptedException ex1) - { - } - } + out.__retry(cnt, interval); } - else + else if(interval > 0) { - if(out != null) + // + // Sleep before retrying. + // + try + { + Thread.currentThread().sleep(interval); + } + catch(InterruptedException ex1) { - out.__send(cnt); } } - return cnt; } diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java new file mode 100644 index 0000000..0e0065a --- /dev/null +++ b/java/src/IceInternal/RetryQueue.java @@ -0,0 +1,49 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class RetryQueue +{ + RetryQueue(Instance instance) + { + _instance = instance; + } + + synchronized public void + add(OutgoingAsync outAsync, int interval) + { + RetryTask task = new RetryTask(this, outAsync); + _instance.timer().schedule(task, interval); + _requests.add(task); + } + + synchronized public void + destroy() + { + java.util.Iterator p = _requests.iterator(); + while(p.hasNext()) + { + RetryTask task = p.next(); + _instance.timer().cancel(task); + task.destroy(); + } + _requests.clear(); + } + + synchronized boolean + remove(RetryTask task) + { + return _requests.remove(task); + } + + final private Instance _instance; + final private java.util.HashSet _requests = new java.util.HashSet(); +}; + diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java new file mode 100644 index 0000000..5c17c8a --- /dev/null +++ b/java/src/IceInternal/RetryTask.java @@ -0,0 +1,44 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +class RetryTask implements TimerTask +{ + RetryTask(RetryQueue queue, OutgoingAsync outAsync) + { + _queue = queue; + _outAsync = outAsync; + } + + public void + runTimerTask() + { + if(_queue.remove(this)) + { + try + { + _outAsync.__send(); + } + catch(Ice.LocalException ex) + { + _outAsync.__releaseCallback(ex); + } + } + } + + public void + destroy() + { + _outAsync.__releaseCallback(new Ice.CommunicatorDestroyedException()); + } + + private final RetryQueue _queue; + private final OutgoingAsync _outAsync; +}