admin管理员组

文章数量:814266

activemq保证消息顺序原理

有时候我们需要消费者消费消息是顺序消费的。比如生成一个订单,先扣库存,然后扣款,这两条消息,由于现在系统都是分布式的,我们可能需要在有多台机器的多个消费者时,这两条消息是顺序消费的。在activemq中,有两种方式来保证,消息消费的顺序性。

1)通过高级特性consumer独有消费者(exclusive consumer)

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

下面我们从代码层面分析一下原理
其实找到Queue.java里面的doActualDispatch这个方法就可以啦

    /*** @return list of messages that could get dispatched to consumers if they*         were not full.*/private PendingList doActualDispatch(PendingList list) throws Exception {List<Subscription> consumers;consumersLock.readLock().lock();try {if (this.consumers.isEmpty()) {// slave dispatch happens in processDispatchNotificationreturn list;}consumers = new ArrayList<Subscription>(this.consumers);} finally {consumersLock.readLock().unlock();}Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {MessageReference node = iterator.next();Subscription target = null;for (Subscription s : consumers) {if (s instanceof QueueBrowserSubscription) {continue;}if (!fullConsumers.contains(s)) {if (!s.isFull()) {if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {// Dispatch it.s.add(node);LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());iterator.remove();target = s;break;}} else {// no further dispatch of list to a full consumer to// avoid out of order message receiptfullConsumers.add(s);LOG.trace("Subscription full {}", s);}}}if (target == null && node.isDropped()) {iterator.remove();}// return if there are no consumers or all consumers are fullif (target == null && consumers.size() == fullConsumers.size()) {return list;}// If it got dispatched, rotate the consumer list to get round robin// distribution.if (target != null && !strictOrderDispatch && consumers.size() > 1&& !dispatchSelector.isExclusiveConsumer(target)) {consumersLock.writeLock().lock();try {if (removeFromConsumerList(target)) {addToConsumerList(target);consumers = new ArrayList<Subscription>(this.consumers);}} finally {consumersLock.writeLock().unlock();}}}return list;}

找到最重要的一行代码

                        if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {

在分析dispatchSelector.canSelect(s, node)这个这个函数

    public boolean canSelect(Subscription subscription,MessageReference m) throws Exception {boolean result = !paused && super.canDispatch(subscription, m);if (result && !subscription.isBrowser()) {result = exclusiveConsumer == null || exclusiveConsumer == subscription;}return result;}

上面的代码很明显是说如果是独占消费者,并且是循环里面的当前消费者,或者没有独占消费者。则循环里面的当前消费者即被选中能够消费该条消息。

2)利用Activemq的高级特性:messageGroups

    Message Groups特性是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息,直到:

2.1) Consumer被关闭
2.2) Message group被关闭,通过发送一个消息,并设置这个消息的JMSXGroupSeq为-1
下面分析一下实现代码,其实还是上面的最重要的那句话
if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
其中的assignMessageGroup(s, (QueueMessageReference)node)这个判断

    protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {boolean result = true;// Keep message groups together.String groupId = node.getGroupID();int sequence = node.getGroupSequence();if (groupId != null) {
// 先查找该queue存储的一个groupId,和consumerId的一个mapMessageGroupMap messageGroupOwners = getMessageGroupOwners();// If we can own the first, then no-one else should own the// rest.// 如果是该组的第一条消息。则指定该consumer消费该消息组if (sequence == 1) {assignGroup(subscription, messageGroupOwners, node, groupId);} else {// Make sure that the previous owner is still valid, we may// need to become the new owner.ConsumerId groupOwner;groupOwner = messageGroupOwners.get(groupId);if (groupOwner == null) {assignGroup(subscription, messageGroupOwners, node, groupId);} else {if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {// A group sequence < 1 is an end of group signal.if (sequence < 0) {messageGroupOwners.removeGroup(groupId);subscription.getConsumerInfo().decrementAssignedGroupCount(destination);}} else {// 说明该消费者不能消费该消息组result = false;}}}}return result;}

还有一个比较有意思的一个点也是doActualDispatch里面的代码

                    if (removeFromConsumerList(target)) {addToConsumerList(target);consumers = new ArrayList<Subscription>(this.consumers);}

先从消费者队列里面删除,然后在加到消费者队列里面,看一下addToConsumerList这个方法

    private void addToConsumerList(Subscription sub) {if (useConsumerPriority) {consumers.add(sub);Collections.sort(consumers, orderedCompare);} else {consumers.add(sub);}}

里面的排序方法

    private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {@Overridepublic int compare(Subscription s1, Subscription s2) {// We want the list sorted in descending orderint val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();if (val == 0 && messageGroupOwners != null) {// then ascending order of assigned message groups to favour less loaded consumers// Long.compare in jdk7long x = s1.getConsumerInfo().getAssignedGroupCount(destination);long y = s2.getConsumerInfo().getAssignedGroupCount(destination);val = (x < y) ? -1 : ((x == y) ? 0 : 1);}return val;}};

可以看到消费者实际上根据两个维度排序了,一个是消费者的Priority,即消费者的优先级。还有一个是消费者的指定的消息组的个数AssignedGroupCount。这个顺序直接影响到下一条小时是谁来接收。

本文标签: activemq保证消息顺序原理