Non-blocking UDP datagram replicator

A class that listens to a UDP port and collects all the datagrams and then rebroadcasts those datagrams to other ports. This is useful for several reasons. I use it when stress testing UDP clients because I can subscribe to 1000 client sockets while only really having a single legitimate datasource.

public class Replicator {
	private final AsyncDatagramServer aserver;

	public Replicator(final int port,
			final Collection<DataSinkPoint> endPoints) throws Exception {
		aserver = new AsyncDatagramServer(port,
				new AsyncDatagramServer.AsyncDatagramServerListener() {

			@Override
			public void recieveDatagram(ByteBuffer buffer) {
				try {
					for(final DataSinkPoint d: endPoints) {
						d.send(buffer.duplicate());
					}
				} catch (final Exception e) {
					e.printStackTrace();
				}
			}
		});
	}

	private static class AsyncDatagramServer {

		private boolean running = true;

		public AsyncDatagramServer(final int port,
				final AsyncDatagramServerListener listener) throws Exception {

			new Thread(){

				@Override
				public void run() {
					try {
						startSocket(port, listener);
					} catch (final Exception e) {
						throw new RuntimeException(e);
					}
				}

			}.start();

		}

		public interface AsyncDatagramServerListener {
			void recieveDatagram(ByteBuffer buffer);
		}

		public void shutdown() {
			running = false;
		}

		private void startSocket(final int port,
				AsyncDatagramServerListener listener) throws IOException,
				SocketException, ClosedChannelException, Exception {
			DatagramChannel serverChannel = DatagramChannel.open();
			Selector selector = Selector.open();
			DatagramSocket sock = serverChannel.socket();
			sock.setReuseAddress(true);
			sock.bind (new InetSocketAddress (port));
			serverChannel.configureBlocking (false);
			serverChannel.register(selector, SelectionKey.OP_READ);
			ByteBuffer buffer = ByteBuffer.allocate(2048);

			while (running) {
				if(selector.select(500) > 0) {
					processData(listener, selector, buffer);
				}
			}

			selector.close();
			serverChannel.close();
		}

		private void processData(AsyncDatagramServerListener listener,
				Selector selector, ByteBuffer buffer) throws Exception {
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();

			while (it.hasNext()) {
				final SelectionKey key = it.next();

				if (key.isReadable()) {
					DatagramChannel channel =
						(DatagramChannel) key.channel();
					buffer.clear();
					if (channel.receive(buffer) != null) {
						buffer.flip();
						listener.recieveDatagram(buffer.duplicate());
					}
				}
				it.remove();
			}
		}



	}

	public void shutdown() {
		aserver.shutdown();
	}

	public interface DataSinkPoint {
		void send(ByteBuffer buffer) throws IOException;
	}

	private static class DataSinkPointImpl implements DataSinkPoint {
		private final DatagramSocket socket;

		public DataSinkPointImpl(SocketAddress address) throws Exception {
			socket = new DatagramSocket();
			socket.connect(address);
		}

		@Override
		public void send(ByteBuffer buffer) throws IOException {
			socket.send(new DatagramPacket(buffer.array(),
					0, buffer.limit()));
		}

	}

}

Usage:

public static void main(String[] args) throws Exception {
	Collection<DataSinkPoint> endPoints = new ArrayList<DataSinkPoint>();
	for(int i = 4000; i < 5000; i++) {
		endPoints.add(new DataSinkPointImpl(
				new InetSocketAddress("127.0.0.1", i)));
	}

	final Replicator r = new Replicator(3000, endPoints);
	Thread.sleep(5000);
	r.shutdown();
	System.out.println("done.");
}