Archived

This forum has been archived. Please start a new discussion on GitHub.

IceStorm - Is it possible to log all published messages to files?

Is it possible to log all messages published to an IceStrom service without having to create subscribers for each topic?

Comments

  • mes
    mes California
    Hi,

    Can you explain a little more about what you're trying to achieve?

    Thanks,
    Mark
  • We have an application where systems (some real and some simulations) are publishing command data, telemetry, heartbeat and other kinds of information to IceStorm service. We would like to record all this information in log files to be accesses later to test some of our applications in offline more.

    Essentially we want to know if it is posssible to automatically dump all messages going through the IceStorm server to a file?
  • mes
    mes California
    Hi,

    Ice provides some tracing options. For example, protocol tracing (enabled via the Ice.Trace.Protocol property) displays information such as the operation name, target identity, and request contexts.

    However, it sounds like you're asking if it's possible to record the binary data for each incoming protocol message, so that you can "replay" the messages again. I'm afraid neither Ice nor IceStorm provides such a feature.

    It should be possible to implement this, but it's a non-trivial project. For example, you could write a custom implementation of the Ice::Router interface (i.e., a mini Glacier2 router) whose only purpose is dumping the incoming request "blobs" to a file and then forwarding them on to the IceStorm service. This router could even be collocated in the same IceBox server as your IceStorm service. Of course, this would also mean that your publishers must be configured to use this intermediary router instead of communicating directly with IceStorm.

    Regards,
    Mark
  • (hi Sriram! [he and I work on the same project])

    My goal was two-fold: 1) provide a human-readable log of messages on the IceStorm bus and 2) provide a machine-readable log that could be replayed onto the IceStorm bus for testing and demos.

    From what I understand, using the TopicManager, I can get a list of all topics. Subscribing to each, I could subscribe to each topic with a "Blobject". My intention was to then use the class loader to load the (slice2java generated) class identified by Ice.Current.id, and to populate it using the InputStream interface, then lastly do introspection or just call toString() on the class to dump it to a log.

    For replay, I'd just dump the inParams to a flat file or database or somesuch with a timestamp and the Ice.Current critical elements.

    However, it appears that the Current.id is empty for IceStorm messages (I discovered this today) so that ends that. I don't know if that is by design, or a bug.

    Thoughts, further comments, etc? Am I crazy in pursuing this? (I've only been doing Ice coding for a couple weeks now, and am doing this partially as a learning exercise.)
  • Proof of concept

    Hi,

    I wrote a little proof of concept on how to "passively" log IceStorm messages. The example consists of two scripts, Recorder.py, which subscribes to all topics and records messages and Player.py which then replays them. The scripts aren't fancy in any way and you need to CTRL-C Recorder in order to make it write the recorded messages. You should be able to test this with the demos that come with Ice easily.

    Recorder.py:
    #!/usr/bin/env python
    import sys, traceback, Ice, IceStorm, getopt, time, base64, json
    
    output = []
    
    class Logger(Ice.Blobject):
        def __init__(self, topic):
            self.topic = topic
    
        def ice_invoke(self, payload, current):
            global output
            output.append({"time:":time.time(), "topic":self.topic, "operation":current.operation, "payload":base64.b64encode(payload)})
            return (True, buffer(""))
    
    class Recorder(Ice.Application):
        def run(self, args):
            manager = IceStorm.TopicManagerPrx.checkedCast(self.communicator().propertyToProxy('TopicManager.Proxy'))
    
            topics = {}
            for name in manager.retrieveAll(): topics[name] = {"topic":manager.retrieve(name)}
    
            adapter = self.communicator().createObjectAdapter("Recorder")
    
            qos = {"reliability":"ordered"}
    	for name in topics.keys():
                id = Ice.generateUUID()
                topics[name]["subscriber"] = adapter.add(Logger(name), adapter.getCommunicator().stringToIdentity(id))
                topics[name]["topic"].subscribeAndGetPublisher(qos, topics[name]["subscriber"])
    
            adapter.activate()
            self.shutdownOnInterrupt()
            self.communicator().waitForShutdown()
            [topics[name]["topic"].unsubscribe(topics[name]["subscriber"]) for name in topics.keys()]
                
            print json.dumps(output)
            return 0
    
    app = Recorder()
    sys.exit(app.main(sys.argv, "config"))
    

    Player.py:
    #!/usr/bin/env python
    import sys, traceback, time, Ice, IceStorm, getopt, json, base64
    
    class Player(Ice.Application):
        def run(self, args):
    	data = json.loads(sys.stdin.read())
    
            publishers = {}
            manager = IceStorm.TopicManagerPrx.checkedCast(self.communicator().propertyToProxy('TopicManager.Proxy'))
            for rec in data:
                if not rec["topic"] in publishers:
                    try:
                        topic = manager.retrieve(rec["topic"])
                    except IceStorm.NoSuchTopic, e:
                        try:
                            topic = manager.create(rec["topic"])
                        except IceStorm.TopicExists, ex:
                            print self.appName() + ": temporary error. try again"
                            return 1
                    publishers[rec["topic"]] = topic.getPublisher()
                publishers[rec["topic"]].ice_invoke(rec["operation"], Ice.OperationMode.Normal, buffer(base64.b64decode(rec["payload"])), None)
                    
            return 0
    
    app = Player()
    sys.exit(app.main(sys.argv, "config"))
    

    config:
    Recorder.Endpoints=tcp
    TopicManager.Proxy=DemoIceStorm/TopicManager:default -p 10000
    

    Usage:
    python Recorder.py >output.json
    (CTRL-C)
    cat output.json | python Player.py
    

    Cheers,
    Michael
  • awesome!

    Thanks Michael, that's what I was thinking of. I'll try it out!
  • Wow, I hadn't come across the Dynamic Ice stuff before -- it seems very cool. :)

    I've been reading the docs and playing around with this a bit, and it seems like it's a nice trick to eavesdrop on IceStorm communication as it passes.

    One thing: is it possible to get the sequence of arg types from an Ice.InputStream as you go? I suspect not ... what I mean is, when presented with a byte[], can you do something like the following (WARNING, PSEUDOCODE!):
    Ice.InputStream is = new Ice.InputStream (bytes);
    for (int i = 0; i < is.numArgs(); i++) {
        switch (is.nextArgType) {
            case Int:
                // whatever
            ...
        }
    }
    
  • Answered my own question: no, you can't do this.
    The rules for encoding an enumerator reflect a core design princi-
    ple in Ice: the sender and receiver must know the types that are en-
    coded in a message. The Ice encoding is not self-describing; that is,
    it does not waste space embedding type information along with the
    data values. As a result, it's impossible to unmarshal the contents of
    a message unless you know what types to expect. Using enumera-
    tions as an example, you cannot know how many bytes encode an
    enumerator unless you know its Slice definition.

    From http://zeroc.com/newsletter/issue12.pdf

    Ah well, at least I can log the binary contents and manually reconstruct the objects later on if I need to. :)
  • Agreed on your observation. However, I was thinking that if you maintain a mapping of Topic->Class (and you stick with one class per topic, or else have "Topic+Operation->Class+Method") you could deserialize the stream...But this also requires a lot of reflection work to determine the method parameters and to decode them from the stream. This does require access to all of the slice2java Prx classes and this manual mapping step, but it should be possible.

    current.id contains a UUID, if there's a way to go from UUID to class easily, this would expedite the process a bit, but the method parameter deserialization is still a fairly complex process.
    mefoster wrote: »
    Answered my own question: no, you can't do this.



    From http://zeroc.com/newsletter/issue12.pdf

    Ah well, at least I can log the binary contents and manually reconstruct the objects later on if I need to. :)
  • Hi everyone ... Recorder/Player issue for ICEStorm

    Hi cdknight, grembo and others,

    I have been working on a project of "Stucture Health" which involves publishing different information (temperature etc) on ICEStorm under different topics. The task that I have to accomplish is to implement a recorder/player for this setup. (Basically we aim to have a machine-readable data that could be replayed to the IceStorm for testing and demos). Now, I have looked at the coversation in this thread and Proof of Concept by grembo in Python language. And I have tried to achieve the similar in C++.

    I have altered the code for Publisher/Subscriber given in ICE demos to implement a recorder player for single Topic of ICEStorm (which went okay). But problem came when I am trying to record data for all the topics present in ICEStorm. My idea of implementation is to run a child thread for each topic (1 thread/topic)in recorder file, and each thread should contain the Topic manager proxy (for that topic) necessary to receive data from ICEStorm. However ICE throws an "Already registered" exception every time when I run this code, although I don't seem to be registering anything twice here.

    Now I have 2 questions to ask, First, can anyone please tell me how to get around this ICE exception of "AlreadyRegistered" for ICEStorm, or what could be the mistake that is raising this exception ?

    Secondly, I have seen that discussion on this thread ended like 3 or 4 months earlier, which brings me to conclusion that either no one was able to implement the Recorder/Player or somebody did mange to do it. So, could that "somebody" (or others) please guide me on how to actually get this thing done, throw me some pointers (or even the code). Any help is deeply appreciated.

    Regards,
    Hammad Kabir