Recently I had a requirement in my work, where I had to PUSH data from server to client, when new data arrives.
Now obviously I considered using the OBSERVER design pattern to solve my problem.
The clients were remote but in the same company network. And there was no messaging service available.
I could not find a way to use observer pattern over HTTP, so I chose to use RMI.
Implementing Observer design pattern over RMI is a bit tricky. After searching on the internet,
I found out that a good samaritan has already done the job.
Here is the link, where I took the basic codebase from
http://sites.google.com/site/jamespandavan/Home/java/sample-remote-observer-based-on-rmi
Now there is a problem in this design approach. Once you start the server and the client, the client perfectly receives the messages pushed by the server.
But if the server goes down for some reason, then there is no way for the client to know it. Yes, there is no RemoteException or ConnectException thrown in this case.
The client keeps itself running, wondering, why it is not getting any data from server.
The existing client will not receive any data, even when the server is back up.
To solve this problem, I used the last timestamp, when the client received PUSH data from server. If the time is more than a specified time, client considers the connection dead and tries to reconnect to the server.
This trying to reconnect goes on forever, until the server goes up. Then everything works fine, just like before, as if nothing happened.
Few words before sharing the codebase:
1) Don't forget to make your data serializable.
2) Generate the rmi stub class using the rmic command. Details about this command can be found in the Oracle website here.
2) After the server code is written, create a jar file, containing the RMIService interface, the Model class and the rmi stub.
3) Clients on separate JVM, have to put this jar in their classpath.
Here's the code, more or less the same code found in the above link, with slight modifications:
Running the Server/Client:
Hope this helps. Let me know if it doesn't.
Now obviously I considered using the OBSERVER design pattern to solve my problem.
The clients were remote but in the same company network. And there was no messaging service available.
I could not find a way to use observer pattern over HTTP, so I chose to use RMI.
Implementing Observer design pattern over RMI is a bit tricky. After searching on the internet,
I found out that a good samaritan has already done the job.
Here is the link, where I took the basic codebase from
http://sites.google.com/site/jamespandavan/Home/java/sample-remote-observer-based-on-rmi
Now there is a problem in this design approach. Once you start the server and the client, the client perfectly receives the messages pushed by the server.
But if the server goes down for some reason, then there is no way for the client to know it. Yes, there is no RemoteException or ConnectException thrown in this case.
The client keeps itself running, wondering, why it is not getting any data from server.
The existing client will not receive any data, even when the server is back up.
To solve this problem, I used the last timestamp, when the client received PUSH data from server. If the time is more than a specified time, client considers the connection dead and tries to reconnect to the server.
This trying to reconnect goes on forever, until the server goes up. Then everything works fine, just like before, as if nothing happened.
Few words before sharing the codebase:
1) Don't forget to make your data serializable.
2) Generate the rmi stub class using the rmic command. Details about this command can be found in the Oracle website here.
2) After the server code is written, create a jar file, containing the RMIService interface, the Model class and the rmi stub.
3) Clients on separate JVM, have to put this jar in their classpath.
Here's the code, more or less the same code found in the above link, with slight modifications:
Running the Server/Client:
java -Djava.security.licy=security.policy RmiServer
java -Djava.security.licy=security.policy RmiClient
The Security Policy file:
grant
{
Permission java.security.AllPermission;
};
The POJO DTO (Don't forget the serializable part)
import java.io.Serializable;
public class Model implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String name;
private int id;
private long lastTimsStamp;
public Model(String name, int id,long lastTimsStamp) {
super();
this.name = name;
this.id = id;
this.lastTimsStamp = lastTimsStamp;
}
public String getName() {
return name;
}
public int getId() {
return id;
}
public long getLastTimsStamp() {
return lastTimsStamp;
}
}
Remote observer interface
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface RemoteObserver extends Remote {
void update(Object observable, Object updateMsg) throws RemoteException;
}
RMI client program implementing the interface
import java.net.MalformedURLException;
import java.rmi.ConnectException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.RMISecurityManager;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import rmisubject.Model;
import rmisubject.RemoteObserver;
import rmisubject.RmiSubjectService;
public class RmiObserverClient extends UnicastRemoteObject implements RemoteObserver {
/**
* If the client hasn't received any data for the span of connectionCheckInterval, it will try to reconnect the server.
* It will keep trying until the server is not back up.
* @param connectionCheckInterval
* @throws RemoteException
*/
public RmiObserverClient(long connectionCheckInterval) throws RemoteException{
super();
this.connectionCheckInterval = connectionCheckInterval;
}
private static long lastTimeStamp;
private static final long serialVersionUID = 1L;
private static long connectionCheckInterval = 10*1000;
private static RmiSubjectService remoteService;
Thread checkConnectionAlive = new Thread() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(connectionCheckInterval);
} catch (InterruptedException e) {
// ignore
}
long currentTimeStamp = System.currentTimeMillis();
System.out.println("Inside client checkConnectionAlive : " + currentTimeStamp);
if (currentTimeStamp - lastTimeStamp>connectionCheckInterval )subscribe();
}
};
};
public static void main(String[] args) {
try {
RmiObserverClient rc = new RmiObserverClient(10 * 1000);
rc.subscribe();
rc.checkConnectionAlive.start();
} catch (RemoteException e) {
e.printStackTrace();
}
}
public void subscribe(){
if (System.getSecurityManager() == null)
System.setSecurityManager(new RMISecurityManager());
try {
remoteService = (RmiSubjectService) Naming.lookup("//localhost:9999/RmiService");
remoteService.addObserver(this);
} catch (ConnectException ex) {
ex.printStackTrace();
}catch (RemoteException e) {
e.printStackTrace();
}catch (MalformedURLException e) {
e.printStackTrace();
}catch (NotBoundException e) {
e.printStackTrace();
}
}
@Override
public void update(Object observable, Object updateMsg)
throws RemoteException {
System.out.println("===============start message=============");
Model model = (Model)updateMsg;
System.out.println(model.getName());
System.out.println(model.getId());
lastTimeStamp = model.getLastTimsStamp();
System.out.println(lastTimeStamp);
System.out.println("===============end message=============");
}
}
RMI service interface
public interface RmiSubjectService extends Remote {
void addObserver(RemoteObserver o) throws RemoteException;
}
RMI server (the observable)
import java.io.Serializable;
import java.rmi.RMISecurityManager;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.Observable;
import java.util.Observer;
public class RmiSubjectImpl extends Observable implements RmiSubjectService {
private class WrappedObserver implements Observer, Serializable {
private static final long serialVersionUID = 1L;
private RemoteObserver ro = null;
public WrappedObserver(RemoteObserver ro) {
this.ro = ro;
}
@Override
public void update(Observable o, Object arg) {
try {
ro.update(o.toString(), arg);
} catch (RemoteException e) {
System.out
.println("Remote exception removing observer:" + this);
o.deleteObserver(this);
}
}
}
@Override
public void addObserver(RemoteObserver o) throws RemoteException {
WrappedObserver mo = new WrappedObserver(o);
addObserver(mo);
System.out.println("Added observer:" + mo);
}
Thread thread = new Thread() {
private int i=0;
@Override
public void run() {
while (true) {
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
// ignore
}
setChanged();
notifyObservers(new Model("model", i++,System.currentTimeMillis()));
//notifyObservers(new Date());
}
};
};
public RmiSubjectImpl() {
thread.start();
}
private static final long serialVersionUID = 1L;
public static void main(String[] args) {
if (System.getSecurityManager() == null)
System.setSecurityManager(new RMISecurityManager());
try {
Registry rmiRegistry = LocateRegistry.createRegistry(9999);
RmiSubjectService rmiService = (RmiSubjectService) UnicastRemoteObject
.exportObject(new RmiSubjectImpl(), 9999);
rmiRegistry.bind("RmiService", rmiService);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
Hope this helps. Let me know if it doesn't.