AMQP-242, AMQP-243: Appender Fixes

Wrap routing key layout formatting call with a synchronized block because Log4J's PatternLayout is not thread-safe. Also added a simple fix to actually use the configured Layout, which it was previously ignoring.

Polishing @garyrussell
This commit is contained in:
Jon Brisbin
2012-07-17 14:28:44 -05:00
committed by Gary Russell
parent 0261937f08
commit 8b285910e1
2 changed files with 53 additions and 13 deletions

View File

@@ -6,7 +6,7 @@ apply plugin: 'idea'
buildscript {
repositories {
maven { url 'https://repo.springsource.org/plugins-release' }
maven { url 'http://repo.springsource.org/plugins-release' }
}
dependencies {
classpath 'org.springframework.build.gradle:docbook-reference-plugin:0.1.5'

View File

@@ -52,6 +52,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
* A fully-configured AmqpAppender, with every option set to their defaults, would look like this:
* </p>
* <p/>
*
* <pre>
* <code>
* log4j.appender.amqp=org.springframework.amqp.log4j.AmqpAppender
@@ -126,82 +127,107 @@ public class AmqpAppender extends AppenderSkeleton {
* Name of the exchange to publish log events to.
*/
private String exchangeName = "logs";
/**
* Type of the exchange to publish log events to.
*/
private String exchangeType = "topic";
/**
* Log4J pattern format to use to generate a routing key.
*/
private String routingKeyPattern = "%c.%p";
/**
* Log4J Layout to use to generate routing key.
*/
private Layout routingKeyLayout = new PatternLayout(routingKeyPattern);
/**
* Used to synchronize access to pattern layouts.
*/
private final Object layoutMutex = new Object();
/**
* Whether or not we've tried to declare this exchange yet.
*/
private AtomicBoolean exchangeDeclared = new AtomicBoolean(false);
/**
* Configuration arbitrary application ID.
*/
private String applicationId = null;
/**
* Where LoggingEvents are queued to send.
*/
private LinkedBlockingQueue<Event> events = new LinkedBlockingQueue<Event>();
/**
* The pool of senders.
*/
private ExecutorService senderPool = null;
/**
* How many senders to use at once. Use more senders if you have lots of log output going through this appender.
*/
private int senderPoolSize = 2;
/**
* How many times to retry sending a message if the broker is unavailable or there is some other error.
*/
private int maxSenderRetries = 30;
/**
* Retries are delayed like: N ^ log(N), where N is the retry number.
*/
private Timer retryTimer = new Timer("log-event-retry-delay", true);
/**
* RabbitMQ ConnectionFactory.
*/
private AbstractConnectionFactory connectionFactory;
/**
* RabbitMQ host to connect to.
*/
private String host = "localhost";
/**
* RabbitMQ virtual host to connect to.
*/
private String virtualHost = "/";
/**
* RabbitMQ port to connect to.
*/
private int port = 5672;
/**
* RabbitMQ user to connect as.
*/
private String username = "guest";
/**
* RabbitMQ password for this user.
*/
private String password = "guest";
/**
* Default content-type of log messages.
*/
private String contentType = "text/plain";
/**
* Default content-encoding of log messages.
*/
private String contentEncoding = null;
/**
* Whether or not to try and declare the configured exchange when this appender starts.
*/
private boolean declareExchange = false;
/**
* Used to synchronize access when creating the RabbitMQ ConnectionFactory.
*/
@@ -375,13 +401,17 @@ public class AmqpAppender extends AppenderSkeleton {
Exchange x;
if ("topic".equals(exchangeType)) {
x = new TopicExchange(exchangeName, durable, autoDelete);
} else if ("direct".equals(exchangeType)) {
}
else if ("direct".equals(exchangeType)) {
x = new DirectExchange(exchangeName, durable, autoDelete);
} else if ("fanout".equals(exchangeType)) {
}
else if ("fanout".equals(exchangeType)) {
x = new FanoutExchange(exchangeName, durable, autoDelete);
} else if ("headers".equals(exchangeType)) {
}
else if ("headers".equals(exchangeType)) {
x = new HeadersExchange(exchangeType, durable, autoDelete);
} else {
}
else {
x = new TopicExchange(exchangeName, durable, autoDelete);
}
// admin.deleteExchange(exchangeName);
@@ -472,7 +502,12 @@ public class AmqpAppender extends AppenderSkeleton {
locInfo.getLineNumber()));
}
StringBuffer msgBody = new StringBuffer(String.format("%s%n", logEvent.getRenderedMessage()));
StringBuilder msgBody;
String routingKey;
synchronized (layoutMutex) {
msgBody = new StringBuilder(layout.format(logEvent));
routingKey = routingKeyLayout.format(logEvent);
}
if (null != logEvent.getThrowableInformation()) {
ThrowableInformation tinfo = logEvent.getThrowableInformation();
for (String line : tinfo.getThrowableStrRep()) {
@@ -481,11 +516,11 @@ public class AmqpAppender extends AppenderSkeleton {
}
// Send a message
String routingKey = routingKeyLayout.format(logEvent);
try {
rabbitTemplate
.send(exchangeName, routingKey, new Message(msgBody.toString().getBytes(), amqpProps));
} catch (AmqpException e) {
rabbitTemplate.send(exchangeName, routingKey, new Message(msgBody.toString().getBytes(),
amqpProps));
}
catch (AmqpException e) {
int retries = event.incrementRetries();
if (retries < maxSenderRetries) {
// Schedule a retry based on the number of times I've tried to re-send this
@@ -495,17 +530,20 @@ public class AmqpAppender extends AppenderSkeleton {
events.add(event);
}
}, (long) (Math.pow(retries, Math.log(retries)) * 1000));
} else {
}
else {
errorHandler.error("Could not send log message " + logEvent.getRenderedMessage()
+ " after " + maxSenderRetries + " retries", e, ErrorCode.WRITE_FAILURE, logEvent);
}
} finally {
}
finally {
if (null != applicationId) {
MDC.remove(APPLICATION_ID);
}
}
}
} catch (Throwable t) {
}
catch (Throwable t) {
throw new RuntimeException(t.getMessage(), t);
}
}
@@ -517,7 +555,9 @@ public class AmqpAppender extends AppenderSkeleton {
@SuppressWarnings("rawtypes")
protected class Event {
final LoggingEvent event;
final Map properties;
AtomicInteger retries = new AtomicInteger(0);
public Event(LoggingEvent event, Map properties) {