public interface LifecycleAware {
/**
* <p>
* Starts a service or component.
* </p>
* <p>
* Implementations should determine the result of any start logic and effect
* the return value of {@link #getLifecycleState()} accordingly.
* </p>
*
* @throws LifecycleException
* @throws InterruptedException
*/
public void start();
/**
* <p>
* Stops a service or component.
* </p>
* <p>
* Implementations should determine the result of any stop logic and effect
* the return value of {@link #getLifecycleState()} accordingly.
* </p>
*
* @throws LifecycleException
* @throws InterruptedException
*/
public void stop();
/**
* <p>
* Return the current state of the service or component.
* </p>
*/
public LifecycleState getLifecycleState();
}
生命周期的状态:
public enum LifecycleState {
IDLE, START, STOP, ERROR;
}
SourceRunner
/**
* A source runner controls how a source is driven.
*
* This is an abstract class used for instantiating derived classes.
*/
public abstract class SourceRunner implements LifecycleAware {
private Source source;
/**
* Static factory method to instantiate a source runner implementation that
* corresponds to the type of {@link Source} specified.
*
* @param source The source to run
* @return A runner that can run the specified source
* @throws IllegalArgumentException if the specified source does not implement
* a supported derived interface of {@link SourceRunner}.
*/
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
//根据source类型选择具体的SourceRunner
if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
}
public Source getSource() {
return source;
}
public void setSource(Source source) {
this.source = source;
}
}
PollableSourceRunner会驱动PollableSource。start方法通过调用source类的ChannelProcessor的initialize(),完成拦截器链的初始化,并调用source的start()。然后后台启动一个PollingRunner线程,完成轮询操作。如果未能拉取到数据,则会BACKOFF,线程sleep时间如下:
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
/**
* <p>
* An implementation of {@link SourceRunner} that can drive a
* {@link PollableSource}.
* </p>
* <p>
* A {@link PollableSourceRunner} wraps a {@link PollableSource} in the required
* run loop in order for it to operate. Internally, metrics and counters are
* kept such that a source that returns a {@link PollableSource.Status} of
* {@code BACKOFF} causes the run loop to do exactly that. There's a maximum
* backoff period of 500ms. A source that returns {@code READY} is immediately
* invoked. Note that {@code BACKOFF} is merely a hint to the runner; it need
* not be strictly adhered to.
* </p>
*/
public class PollableSourceRunner extends SourceRunner {
private AtomicBoolean shouldStop;
private CounterGroup counterGroup;//metrics以及计数信息
private PollingRunner runner;
private Thread runnerThread;
private LifecycleState lifecycleState;
public PollableSourceRunner() {
shouldStop = new AtomicBoolean();
counterGroup = new CounterGroup();
lifecycleState = LifecycleState.IDLE;
}
@Override
public void start() {
PollableSource source = (PollableSource) getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();//拦截器链初始化
source.start();
runner = new PollingRunner();
runner.source = source;
runner.counterGroup = counterGroup;
runner.shouldStop = shouldStop;
runnerThread = new Thread(runner);
runnerThread.setName(getClass().getSimpleName() + "-" +
source.getClass().getSimpleName() + "-" + source.getName());
runnerThread.start();//启动PollingRunner线程
lifecycleState = LifecycleState.START;
}
@Override
public void stop() {
runner.shouldStop.set(true);
try {
runnerThread.interrupt();
runnerThread.join();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for polling runner to stop. Please report this.", e);
Thread.currentThread().interrupt();
}
Source source = getSource();
source.stop();
ChannelProcessor cp = source.getChannelProcessor();
cp.close();
lifecycleState = LifecycleState.STOP;
}
@Override
public LifecycleState getLifecycleState() {
return lifecycleState;
}
public static class PollingRunner implements Runnable {
private PollableSource source;
private AtomicBoolean shouldStop;
private CounterGroup counterGroup;
@Override
public void run() {
logger.debug("Polling runner starting. Source:{}", source);
while (!shouldStop.get()) {//是否关闭
counterGroup.incrementAndGet("runner.polls");
try {
//调用process(),尝试从source拉取item,并发送给channel
if (source.process().equals(PollableSource.Status.BACKOFF)) {
//没有拉取到item
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.info("Source runner interrupted. Exiting");
counterGroup.incrementAndGet("runner.interruptions");
} catch (EventDeliveryException e) {
logger.error("Unable to deliver event. Exception follows.", e);
counterGroup.incrementAndGet("runner.deliveryErrors");
} catch (Exception e) {
counterGroup.incrementAndGet("runner.errors");
logger.error("Unhandled exception, logging and sleeping for " +
source.getMaxBackOffSleepInterval() + "ms", e);
try {
Thread.sleep(source.getMaxBackOffSleepInterval());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}
}
}