Streaming data via SIMON RawChannel

Streaming data is now easier with SIMON 1.2.0.
There a two new classes:

http://dev.root1.de/project-sites/simon/apidocs/de/root1/simon/RawChannelInputStream.html
http://dev.root1.de/project-sites/simon/apidocs/de/root1/simon/RawChannelOutputStream.html

It still requires some effort to add them into the context of a SIMON session. But after this is done, you can transfer data via SIMON like with any other stream. You can even combine streams (buffered, zip, encrypted, ... whatever). So in short:

  • Request a rawchannel
  • pass data via the stream
  • close the stream

Here's a simple example which you can download and adapt to your needs:

http://svn.root1.de/svn/simon/trunk/samples/src/main/java/de/root1/simon/samples/rawchannelstream/

Like with any other SIMON project, there is some shared code:

Shared Code

package de.root1.simon.samples.rawchannelstream.shared;

public interface MyServer {

    public void requestStream(ClientCallback clientCallback);

}

As you can see, the server provides a method to the client to request a stream. To be able to create the link, the client need to pass a callback object.

package de.root1.simon.samples.rawchannelstream.shared;

public interface ClientCallback {

    public int establishStream();

}

The client provides the callback object with a method to establish the stream. The "int" result will later contain the rawchannel token (see File Transfer Howto).

Then we have the server code:

Server Code

This is the SIMON server implementation:

package de.root1.simon.samples.rawchannelstream.server;

import de.root1.simon.RawChannel;
import de.root1.simon.RawChannelOutputStream;
import de.root1.simon.Simon;
import de.root1.simon.annotation.SimonRemote;
import de.root1.simon.samples.rawchannelstream.shared.ClientCallback;
import de.root1.simon.samples.rawchannelstream.shared.MyServer;
import java.io.IOException;

@SimonRemote(value = {MyServer.class})
public class MyServerImpl implements MyServer {

    @Override
    public void requestStream(ClientCallback clientCallback) {

        int channelToken = clientCallback.establishStream();
        RawChannel rawChannel = Simon.openRawChannel(channelToken, clientCallback);
        RawChannelOutputStream rcos = new RawChannelOutputStream(rawChannel);
        try {
            rcos.write("Hello World via RawChannelStream".getBytes());
            rcos.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

The server implementation queries via the client's callback a channel token. With this token, the SIMON implementation knows where to send the data.
So we advise SIMON to open a RawChannel. To have a streaming interface, we create a RawChannelOutputStream and pass the rawchannel as the underlying layer.

Now we are ready to stream data ...

And here's the code to start the server:

package de.root1.simon.samples.rawchannelstream.server;

import de.root1.simon.Registry;
import de.root1.simon.Simon;
import de.root1.simon.exceptions.NameBindingException;
import java.io.IOException;
import java.net.UnknownHostException;

public class Server {

    public static void main(String[] args) {
        try {
            System.out.println("Server started ...");
            MyServerImpl myServerImpl = new MyServerImpl();
            Registry registry = Simon.createRegistry();
            registry.start();
            System.out.println("Registry created");
            registry.bind("myServer", myServerImpl);
            System.out.println("Remote Object bound ...");

            // Server is now running. If you whish to shutdown, call this lines:
            //registry.stop();
            //registry.unbind(RawChannelServer.BIND_NAME);

        } catch (UnknownHostException ex) {
            ex.printStackTrace();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (NameBindingException ex) {
            ex.printStackTrace();
        }
    }

}

Nothing special. It's just about creating the registry, starting it and bind the server implementation.

So, we're done with the server. Here comes the client:

ClientCode

package de.root1.simon.samples.rawchannelstream.client;

import de.root1.simon.RawChannelInputStream;
import de.root1.simon.Simon;
import de.root1.simon.annotation.SimonRemote;
import de.root1.simon.samples.rawchannelstream.shared.ClientCallback;
import java.io.IOException;

@SimonRemote(value={ClientCallback.class})
public class ClientCallbackImpl implements ClientCallback {

    private RawChannelInputStream rcis;

    public ClientCallbackImpl() throws IOException {
        rcis = new RawChannelInputStream();
        rxThread.start();
    }

    Thread rxThread = new Thread(){

        @Override
        public void run() {
            String message = "";
            byte[] data = new byte[8*1024];
            int read=0;
            try {

                while ((read=rcis.read(data))!=-1) {

                    message += new String(data, 0, read);
                }
                rcis.close();
                System.out.println("Received message: " +message);

            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }

    };

    // register the file receiver and return the channel token
    @Override
    public int establishStream() {
        return Simon.prepareRawChannel(rcis.getRawChannelDataListener(), this);
    }

}

The server will use the client's callback to establish the stream. So the establishStream() method simply prepares a rawchannel and pass the RawChannelDataListener of an RawChannelInputStream instance to SIMON. The result - the channel token - is returned to the calling server.
A thread handles reading of received data. Usually the rawchannel thing is a push-concept: The sender pushes the data actively to the receiver. With the streaming interface, the receiver has to pull the data from the stream. That's why we need an extra thread here.

package de.root1.simon.samples.rawchannelstream.client;

import de.root1.simon.Lookup;
import de.root1.simon.Simon;
import de.root1.simon.exceptions.EstablishConnectionFailed;
import de.root1.simon.exceptions.LookupFailedException;
import de.root1.simon.exceptions.SimonRemoteException;
import de.root1.simon.samples.rawchannelstream.shared.MyServer;
import java.io.IOException;

public class Client {

    public static void main(String[] args) {
        System.out.println("Client started...");
        try {

            ClientCallbackImpl clientCallback = new ClientCallbackImpl();
            System.out.println("Doing lookup ...");
            Lookup nameLookup = Simon.createNameLookup("localhost");

            MyServer myServer = (MyServer) nameLookup.lookup("myServer");

            System.out.println("Requesting stream ...");
            myServer.requestStream(clientCallback);

            System.out.println("Stream request completed, releasing remote object");
            nameLookup.release(myServer);

        } catch (SimonRemoteException ex) {
            ex.printStackTrace();
        } catch (LookupFailedException ex) {
            ex.printStackTrace();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (EstablishConnectionFailed ex) {
            ex.printStackTrace();
        }
        System.out.println("Client terminated...");
    }

}

The resut of the client code is almost like any other client: Create a lookup, do the lookup, and call remote methods. Here we call "requestStream()" to tell the server we want to get a stream connection.

That's all.