Non-blocking UDP datagram replicator
15 Feb 2010A class that listens to a UDP port and collects all the datagrams and then rebroadcasts those datagrams to other ports. This is useful for several reasons. I use it when stress testing UDP clients because I can subscribe to 1000 client sockets while only really having a single legitimate datasource.
public class Replicator {
private final AsyncDatagramServer aserver;
public Replicator(final int port,
final Collection<DataSinkPoint> endPoints) throws Exception {
aserver = new AsyncDatagramServer(port,
new AsyncDatagramServer.AsyncDatagramServerListener() {
@Override
public void recieveDatagram(ByteBuffer buffer) {
try {
for(final DataSinkPoint d: endPoints) {
d.send(buffer.duplicate());
}
} catch (final Exception e) {
e.printStackTrace();
}
}
});
}
private static class AsyncDatagramServer {
private boolean running = true;
public AsyncDatagramServer(final int port,
final AsyncDatagramServerListener listener) throws Exception {
new Thread(){
@Override
public void run() {
try {
startSocket(port, listener);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}.start();
}
public interface AsyncDatagramServerListener {
void recieveDatagram(ByteBuffer buffer);
}
public void shutdown() {
running = false;
}
private void startSocket(final int port,
AsyncDatagramServerListener listener) throws IOException,
SocketException, ClosedChannelException, Exception {
DatagramChannel serverChannel = DatagramChannel.open();
Selector selector = Selector.open();
DatagramSocket sock = serverChannel.socket();
sock.setReuseAddress(true);
sock.bind (new InetSocketAddress (port));
serverChannel.configureBlocking (false);
serverChannel.register(selector, SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(2048);
while (running) {
if(selector.select(500) > 0) {
processData(listener, selector, buffer);
}
}
selector.close();
serverChannel.close();
}
private void processData(AsyncDatagramServerListener listener,
Selector selector, ByteBuffer buffer) throws Exception {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
if (key.isReadable()) {
DatagramChannel channel =
(DatagramChannel) key.channel();
buffer.clear();
if (channel.receive(buffer) != null) {
buffer.flip();
listener.recieveDatagram(buffer.duplicate());
}
}
it.remove();
}
}
}
public void shutdown() {
aserver.shutdown();
}
public interface DataSinkPoint {
void send(ByteBuffer buffer) throws IOException;
}
private static class DataSinkPointImpl implements DataSinkPoint {
private final DatagramSocket socket;
public DataSinkPointImpl(SocketAddress address) throws Exception {
socket = new DatagramSocket();
socket.connect(address);
}
@Override
public void send(ByteBuffer buffer) throws IOException {
socket.send(new DatagramPacket(buffer.array(),
0, buffer.limit()));
}
}
}
Usage:
public static void main(String[] args) throws Exception {
Collection<DataSinkPoint> endPoints = new ArrayList<DataSinkPoint>();
for(int i = 4000; i < 5000; i++) {
endPoints.add(new DataSinkPointImpl(
new InetSocketAddress("127.0.0.1", i)));
}
final Replicator r = new Replicator(3000, endPoints);
Thread.sleep(5000);
r.shutdown();
System.out.println("done.");
}