diff --git a/pom.xml b/pom.xml index 62f620e2..33c75656 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,13 @@ HEAD + + + ecwid + http://nexus.ecwid.com/content/groups/public + + + spring-cloud-consul-core spring-cloud-consul-config @@ -119,6 +126,11 @@ + + com.google.code.gson + gson + 2.3.1 + org.apache.httpcomponents diff --git a/spring-cloud-consul-core/pom.xml b/spring-cloud-consul-core/pom.xml index ed025c00..fb4369c1 100644 --- a/spring-cloud-consul-core/pom.xml +++ b/spring-cloud-consul-core/pom.xml @@ -33,6 +33,10 @@ com.ecwid.consul consul-api + + com.google.code.gson + gson + org.projectlombok lombok diff --git a/spring-cloud-consul-discovery/src/main/java/org/springframework/cloud/consul/discovery/TtlScheduler.java b/spring-cloud-consul-discovery/src/main/java/org/springframework/cloud/consul/discovery/TtlScheduler.java index 38e799e3..201d13a5 100644 --- a/spring-cloud-consul-discovery/src/main/java/org/springframework/cloud/consul/discovery/TtlScheduler.java +++ b/spring-cloud-consul-discovery/src/main/java/org/springframework/cloud/consul/discovery/TtlScheduler.java @@ -7,120 +7,61 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import org.springframework.scheduling.annotation.Scheduled; -import javax.annotation.PreDestroy; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.*; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; /** * Created by nicu on 11.03.2015. */ @Slf4j public class TtlScheduler implements ApplicationContextAware { - private static final AtomicInteger _instances = new AtomicInteger(); private static final int DEFAULT_TTL = 3; // must be > 1 public static final int HEARTBEAT_INTERVAL_RATIO = 2 / 3; - private final ScheduledExecutorService ttlExecutor; - private volatile ScheduledFuture scheduledFuture; - private final Thread shutdownThread; - private final Set serviceIds; - private final AtomicBoolean isShuttingDown; - private final Runnable ttlPingThread; + private final Map serviceHeartbeats = new ConcurrentHashMap<>(); + private final AtomicBoolean heartbeatingNow = new AtomicBoolean(); private volatile int ttl; private volatile int heartbeatInterval; @Autowired private ConsulClient client; - public TtlScheduler() { - if (_instances.addAndGet(1) > 1) { - throw new IllegalStateException("Expecting this to be used in singleton mode!"); - } - //one thread to refresh ttl for each registered app to local agent is ok - ttlExecutor = Executors.newSingleThreadScheduledExecutor(); - shutdownThread = new Thread(new Runnable() { - public void run() { - log.info("Shutting down the Executor Pool for TtlScheduler"); - shutdownExecutorPool(); - } - }); - Runtime.getRuntime().addShutdownHook(shutdownThread); - serviceIds = Collections.newSetFromMap(new ConcurrentHashMap()); - isShuttingDown = new AtomicBoolean(); - ttlPingThread = new TtlPingThread(); - } - @Override public void setApplicationContext(ApplicationContext context) throws BeansException { ttl = context.getEnvironment().getProperty("consul.ttl", Integer.class, DEFAULT_TTL); ttl = Math.min(2, ttl); // heartbeat at 2/3 ttl, but no later than ttl -1s and, (under lesser priority), no sooner than 1s from now heartbeatInterval = Math.max(ttl - 1, Math.min(ttl * HEARTBEAT_INTERVAL_RATIO, 1)); - scheduleTtlHeartbeat(); } /** * Add a service to the checks loop. */ public void add(final NewService service) { - serviceIds.add(service.getId()); + serviceHeartbeats.put(service.getId(), 0L); } public void remove(String serviceId) { - serviceIds.remove(serviceId); + serviceHeartbeats.remove(serviceId); } public int getTTL() { return ttl; } - - private void schedule(int millis, Runnable command) { - ttlExecutor.schedule(command, millis, TimeUnit.MILLISECONDS); - } - - private void scheduleTtlHeartbeat() { - scheduledFuture = ttlExecutor.scheduleAtFixedRate( - ttlPingThread, - 0, heartbeatInterval, - TimeUnit.SECONDS); - } - - private void shutdownExecutorPool() { - isShuttingDown.set(true); - ttlExecutor.shutdown(); - try { - Runtime.getRuntime().removeShutdownHook(shutdownThread); - } catch (IllegalStateException ignored) { - } - } - - class TtlPingThread implements Runnable { - public void run() { - try { - heartbeatServices(); - } catch (Throwable e) { - log.error("Exception while trying to send heartbeat from application to consul local agent in due TTL", e); - } - } - } - + @Scheduled(initialDelay = 0, fixedRate = 100) void heartbeatServices() { - for (String serviceId : serviceIds) { - if (!isShuttingDown.get()) { - client.agentCheckPass(serviceId); + if (heartbeatingNow.compareAndSet(false, true)) { + for (String serviceId : serviceHeartbeats.keySet()) { + long latestHeartbeatDoneForService = serviceHeartbeats.get(serviceId); + if(latestHeartbeatDoneForService + heartbeatInterval <= System.currentTimeMillis()) { + client.agentCheckPass(serviceId); + serviceHeartbeats.put(serviceId, System.currentTimeMillis()); + } } } } - - @PreDestroy - public void shutdown() { - if (scheduledFuture != null) { - scheduledFuture.cancel(true); - } - } } \ No newline at end of file diff --git a/spring-cloud-consul-utils/pom.xml b/spring-cloud-consul-utils/pom.xml index 62704da2..dfd085b1 100644 --- a/spring-cloud-consul-utils/pom.xml +++ b/spring-cloud-consul-utils/pom.xml @@ -18,6 +18,10 @@ com.ecwid.consul consul-api + + com.google.code.gson + gson + org.springframework.cloud spring-cloud-consul-config