AMQP-115: refactor into separate listeners for quotes and trades

This commit is contained in:
Dave Syer
2011-03-14 11:46:27 +00:00
parent f68cb3149e
commit 69269c5195
2 changed files with 28 additions and 29 deletions

View File

@@ -51,49 +51,48 @@ public class QuoteController {
private ConcurrentMap<String, TradeResponse> responses = new ConcurrentHashMap<String, TradeResponse>();
private Queue<Quote> quotes = new PriorityBlockingQueue<Quote>(100, new QuoteComparator());
private long timeout = 30000; // 30 seconds of data
public void setStockServiceGateway(StockServiceGateway stockServiceGateway) {
this.stockServiceGateway = stockServiceGateway;
}
public void handleMessage(Object message) {
logger.info("Client received: " + message);
if (message instanceof TradeResponse) {
TradeResponse response = (TradeResponse) message;
String key = response.getRequestId();
responses.putIfAbsent(key, response);
Collection<TradeResponse> queue = new ArrayList<TradeResponse>(responses.values());
long timestamp = System.currentTimeMillis() - timeout;
for (Iterator<TradeResponse> iterator = queue.iterator(); iterator.hasNext();) {
TradeResponse tradeResponse = iterator.next();
if (tradeResponse.getTimestamp()<timestamp) {
responses.remove(tradeResponse.getRequestId());
}
public void handleTrade(TradeResponse response) {
logger.info("Client received: " + response);
String key = response.getRequestId();
responses.putIfAbsent(key, response);
Collection<TradeResponse> queue = new ArrayList<TradeResponse>(responses.values());
long timestamp = System.currentTimeMillis() - timeout;
for (Iterator<TradeResponse> iterator = queue.iterator(); iterator.hasNext();) {
TradeResponse tradeResponse = iterator.next();
if (tradeResponse.getTimestamp() < timestamp) {
responses.remove(tradeResponse.getRequestId());
}
} else if (message instanceof Quote) {
long timestamp = System.currentTimeMillis() - timeout;
for (Iterator<Quote> iterator = quotes.iterator(); iterator.hasNext();) {
Quote quote = iterator.next();
if (quote.getTimestamp()<timestamp) {
iterator.remove();
}
}
quotes.add((Quote) message);
}
}
public void handleMessage(Quote message) {
logger.info("Client received: " + message);
long timestamp = System.currentTimeMillis() - timeout;
for (Iterator<Quote> iterator = quotes.iterator(); iterator.hasNext();) {
Quote quote = iterator.next();
if (quote.getTimestamp() < timestamp) {
iterator.remove();
}
}
quotes.add(message);
}
@RequestMapping("/quotes")
@ResponseBody
public List<Quote> quotes(@RequestParam(required = false) Long timestamp) {
if (timestamp == null) {
timestamp = 0L;
}
// TODO: remove older quotes
ArrayList<Quote> list = new ArrayList<Quote>();
for (Quote quote : quotes) {
if (quote.getTimestamp()>timestamp) {
if (quote.getTimestamp() > timestamp) {
list.add(quote);
}
}
@@ -106,7 +105,7 @@ public class QuoteController {
public TradeRequest trade(@ModelAttribute TradeRequest tradeRequest) {
String ticker = tradeRequest.getTicker();
Long quantity = tradeRequest.getQuantity();
if (quantity == null || quantity<=0 || !StringUtils.hasText(ticker)) {
if (quantity == null || quantity <= 0 || !StringUtils.hasText(ticker)) {
// error
return tradeRequest;
} else {
@@ -132,7 +131,7 @@ public class QuoteController {
private static class QuoteComparator implements Comparator<Quote> {
public int compare(Quote o1, Quote o2) {
return new Long(o1.getTimestamp()-o2.getTimestamp()).intValue();
return new Long(o1.getTimestamp() - o2.getTimestamp()).intValue();
}
}

View File

@@ -37,8 +37,8 @@
<listener-container
connection-factory="connectionFactory" message-converter="jsonMessageConverter" xmlns="http://www.springframework.org/schema/rabbit">
<listener ref="quoteController" method="handleMessage"
queue-names="marketDataQueue,tradeQueue" />
<listener ref="quoteController" method="handleQuote" queue-names="marketDataQueue" />
<listener ref="quoteController" method="handleTrade" queue-names="tradeQueue" />
</listener-container>
<rabbit:admin connection-factory="connectionFactory"/>