A charset is used when converting the message body
String to a byte[] to send to AMQP.
For backwards compatibility, the appender uses the
system charset by default, but now allows the charset
name to be specified in the log4j configuration...
log4j.appender.amqp.charset=UTF-8
When using the AMQP log4j appender, it
can cause orphaned daemon threads when undeploying WARs.
Also, when exiting main() methods, apps should call
Log4jConfigurer.shutdownLogging(); in a finally block
to destroy() the connetion factory (releasing threads
used by the rabbit client).
When running a listener container with local transactions
(channelTransacted, and no external transaction manager), the
consumer's channel is bound to the thread for use by downstream
RabbitTemplates.
However, the syncronizedWithTransaction boolean was not set so
the RabbitTemplate closed the channel after its operation.
We should never close the consumer's channel.
The solution is to set the boolean when binding the resource.
In addition, when using a RabbitTransactionManager, the
RabbitResourceHolder.closeAll() method would close the consumer's
channel.
Previously, the consumer's channel was registered with a
ThreadLocal in the ConnectionFactoryUtils. This enabled
the doGetTransactionalResourceHolder method to bind the
consumer's channel.
The RabbitResourceHolder.closeAll() now examines the channels
is it closing and skips the close for the consumer's channel.
Added tests to the Local and External transaction test cases
to ensure the appropriate channel.close() calls are executed,
depending on the scenario. e.g. a local transaction with
exposeListenerChannel=false should close() the exposed channel
but not the consumer's channel.
When stopping a listener container, physically close the channel
so any queued, but not yet processed messages go back to Ready
instead of remaining un-ack'd.
Rename methods to is/setPhysicalCloseRequired()
Namespace support for SimpleListenerContainer was missing
channel-transacted attribute.
AMQP-256 Polishing
Add cross check - disallow a transacted channel when
acknowlege='NONE' (autoack in Rabbit-speak).
When a 'multiple' ack is received, we iterate over pending acks for
each listener in order to generate a discrete ack for each.
While this is synchronized on the entire pending acks map, addition
of a new pending ack can cause a ConcurrentModificationException.
sychnronize the put(); add a test to reproduce the issue and to
show the correction fixes the problem.
When using local transactions (no tx manager), the channel
should be bound to the thread to enable any upstream
RabbitTemplate operations to use the same channel.
When an external tx manager is a RabbitTransactionManager,
a different channel was bound, and the consumer channel
was not committed.
1. When invoking a MessageListener (not ChannelAware),
and exposeListenerChannel is true (default),
bind the consumer's channel to the thread to make
it available for up-stack RabbitTemplate operations.
2. When invoking a ChannelAwareMessageListener, and there
is no external tx manager, and exposeListenerChannel is
true (default), bind the channel to the thread to make
it available for up-stack RabbitTemplate operations.
3. When invoking a ChannelAwareMessageListener, and there
is no external tx manager, and exposeListenerChannel is
false, bind the new (temporary) channel to the thread to
make it available for up-stack RabbitTemplate operations.
Note that work on this temporary channel is committed on
the return and txSize has no bearing.
4. Whenever there is an external transaction manager, the
exposeListenerChannel is ignored - it is always exposed in
that case this is already documented in javadoc - add a
WARN log to the initialization code.
5. When there is an external transaction manager, (but not
a RabbitTransactionManager) the
listener's channel is always bound to the thread. This was
always the case and is mentioned here for completeness.
6. Previously, when an external transaction manager was
a RabbitTransactionManager, the wrong channel was bound
to the thread, and it was that channel that was committed,
instead of the listener channel. This was the originally
reported issue, with the workaround being to remove
the transaction manager; but that caused no channel to
be bound.
This last problem was due to the fact that the transaction
template obtained a new connection and bound it. It was
that connection that the TxManager committed.
Solution was to add a ThreadLocal to ConnectionFactoryUtils
to hold consumer channels. When the transaction starts,
the utils now look at this ThreadLocal before creating
a new connection.
The connection is unregistered when the consumer exits.
Add tests for transacted (with/without txMgr) for MessageListener
and ChannelAwareMessageListener to ensure the channel
is bound for use by up-stack templates.
Add test with Rabbit TxMgr showing correct channel
is bound and committed.
AMQP-260 Polishing
Rabbit can piggy-back confirms - for example, if seq 1, 2, 3 are
sent, it is possible to receive ack #3 with 'multiple' set. These
means 1, 2, and 3 are acked.
This worked fine with just one listener. However, if two or
more listeners (e.g. rabbit templates) are attached, only the
listener for ack #3 is notified (regardless of whether all
the acks belong to it).
The PublisherCallbackChannel maintains two maps: seq-to-listener
and listener-to-map(seq-to-correlation).
This fixes the problem by first finding all the listeners that
have pending confirms at or below the sequence number; and then
uses the second map to send the confirms to the appropriate
listener.
AMQP-255 Polishing
PR Comments
The RetryInterceptor requires messages to have an Id. It does not
work in an environent where some messages have an Id and some don't.
This adds an advice to enhance incoming messages with an id if it is
missing. Such messages cannot participate in the normal retry logic
(backoff policy etc), but allows 1 retry (until the redelivered
header is set). For a redelivered message without an Id, that fails,
the advice throws an AmqpRejectAndDontRequeueException, signaling the
container to tell the broker to stop delivering the message.
The broker can be configured to forward such messages to the Dead
Letter Exchange.
Add tests to verify messages with and without Ids use the
appropriated retry mechanism.
Also adds a RejectAndDontRequeueRecoverer. The default
MessageRecoverer simply eats the failed message; this recoverer
throws the message back to the broker enabling DLE/DLQ processing.
Wrap routing key layout formatting call with a synchronized block because Log4J's PatternLayout is not thread-safe. Also added a simple fix to actually use the configured Layout, which it was previously ignoring.
Polishing @garyrussell
Previously, the temporary reply channel used for send and
receive operations (when not using a reply-queue) were configured
for acks, but no ack was sent. The queue is temporary so was
removed when the consumer was cancelled but, with cached channels,
the admin UI showed an unacked count for the channel.
In addition, there was a race condition that could cause a hung
thread. The handover to the calling thread was done using a
SynchronousQueue; if the caller timed out just as the reply
arrived, the consumer thread could hang on the put.
Changed the temporary reply queue declaration to use auto-ack
and changed the SynchronousQueue to an ArrayBlockingQueue.
If the reply queue was defined with an id (rather than name)
attribute, the template's reply container listened on
the wrong queue name.
The template parser set the container's queues attribute
to the reply-queue attribute value, instead of a
RuntimeBeanReference to a bean of that name.
When a <queue name="foo"/> was used, it worked because
the Queue that Spring created during property assignment
had the same name and all was fine (the container uses
the queue name).
Changed the parser to correctly use a bean reference instead.
When the RabbitTemplate is invoked with an existing transaction,
the channel is bound to the thread, and
ConnectionFactoryUtils.releaseResources(resourceHolder);
is called after processing. There was a commented-out line
in ConnectionFactoryUtils.RabbitResourceSynchronization.afterCompletion()
that would have reset the synchronized state so that
releaseResouces() would "close" the chanel/connection (return
them to the caching factory).
Being commented out, the channel was never removed, and each
transactional template call grabbed a new channel.
However, uncommenting causes issues with the listener container
because it continues to use the channel, so it must not
be closed (made available for reuse).
Added code to reset synch, by default, but not for the listener.
The listener sets a boolean releaseAfterCompletion in the
ResourceHolder to false so that the channel remains and
is not closed (logically or otherwise).
log4j.appender.amqp.generateId=true
Turns on id generation - useful when configuring a
retry interceptor on the consuming side.
AMQP-253 Polishing
Fix default value; move to Message Properties.
* Remove POMs from Project Trees After Gradle Migration
* Remove .springBeans files
For reference: https://jira.springsource.org/browse/AMQP-246
AMQP-246 - Adding .springBeans back to .gitignore
AMQP-83 Reference Doc + JavaDoc fixes - Dist fixes
* Use Docbook XSDs rather than DTDs --> substantial speedup
* For Java Doc follow the same structure as Spring Integration
* Make sure the distribution package's files match the files from the Maven build, e.g. Schema files, license files etc
AMQP-83 - Rename src/docbkx to src/reference
* Remove src/dist/distribution.xml
AMQP-83 Gradle Conversion - Cleanup
* Delete unnecessary Docbook resources
* Update Wrapper to Gradle 1.0 RC3
AMQP-83 - Remove Mavem POM files
AMQP-83 - Update Spring version to 3.0.7 as per AMQP-238
AMQP-83 Rebase with latest commits from master (After 1.1.1 release)
AMQP-83 Code Review
* Remove Bundlor Plugin
* Update README.md
* verify created *pom.xml* files after *gradle install* execution that dependencies match
* ensure that *dist* task is executed by default