长轮询

概述

长轮询(Long Polling)是一种客户端从服务器获取数据的技术。它属于一种服务器推送技术(Server Push),旨在减少服务器和客户端之间的延迟即保证实时性。与传统的轮询不同,长轮询通过让服务器保持HTTP连接一段时间来等待有新数据时再响应客户端请求,从而减少频繁的请求和响应。

普通轮询(Polling)

  • 客户端定时向服务器发送请求,询问是否有新数据。
  • 每次请求无论是否有新数据,服务器都会立即响应。
  • 频繁的请求会增加服务器的负担和网络流量。

长轮询(Long Polling)

  • 客户端发送请求后,服务器不会立即响应,而是保持连接直到有新数据或达到超时。
  • 当有新数据时,服务器立即响应并返回数据;如果超时则返回空响应。
  • 减少了请求次数,从而降低服务器负担和网络流量。

优缺点

优点

  • 减少了客户端和服务器之间的频繁通信,降低了服务器负担。
  • 相比普通轮询,长轮询能够更及时地向客户端推送数据,减少延迟。
  • 实现相对简单,不需要额外的协议支持。

缺点

  • 每个长轮询请求会保持一个HTTP连接,占用服务器资源,如果有大量的并发请求,可能导致服务器连接耗尽。
  • 长时间保持连接可能对防火墙和代理服务器不友好,某些环境下可能不稳定。
  • 实时性比不上WebSocket等更先进的技术。

适用场景

长轮询适用于以下场景:

  • 数据更新不频繁,但需要及时性较高的应用,如社交网络的消息提醒、新闻推送。
  • 不需要高实时性但仍需要减少轮询频率的应用。
  • 不适合使用WebSocket等更复杂技术的简单应用场景。
  • 初始实现和部署需要快速见效的场景。

例如微信扫描、支付、配置中心配置监听等场景

具体实现

同步阻塞方式

同步阻塞方式,利用同步队列实现,但是会导致来一个请求占用一个线程,资源利用率极其低效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RestController
public class SyncLongPollingController {

// 使用阻塞队列来存储消息
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();

// 模拟消息生产者
public void produceMessage(String message) {
messageQueue.offer(message);
}

@GetMapping("/sync-long-polling")
public String longPolling() {
try {
// 等待消息,如果超过30秒则返回空消息
String message = messageQueue.poll(30, TimeUnit.SECONDS);
if (message == null) {
return "No new messages";
} else {
return "New message: " + message;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Error occurred";
}
}
}

异步方式

长轮询挂起请求,等待新数据更新后再响应的处理方式虽然能减少浏览器的请求次数,并带来即时性,但是如果使用同步处理请求的方式,挂起请求则代表了线程的阻塞,有多少长轮询请求未响应就代表要阻塞多少个Servlet线程,这显然是不合理的,所以长轮询必须使用异步处理的方式,而Servlet 3.0开始已经支持了异步处理

image-20240709215356638

大致流程:

  1. 客户端请求
    • 用户发起一个HTTP请求。
  2. 主线程处理
    • 请求进入主线程池(Tomcat容器线程池)。
    • 主线程调用request.startAsync()启动异步处理,并将请求挂起进入请求池。
  3. 注册超时任务
    • 将异步请求对象(AsyncContext)注册到ScheduledThreadPoolExecutor中,设置超时(如60秒)。
    • 在超时时间内,如果没有新数据,将触发超时任务,返回”无新消息”给客户端。
  4. 守护线程监听
    • 守护线程监听事件队列,当有新消息时,取出新数据。
    • 将新消息分配给线程池中的线程执行,并将响应返回给客户端。
  5. 请求处理
    • 线程池中的线程处理新消息,调用AsyncContext.complete()方法,结束异步处理。
    • 主线程释放,返回到线程池。
  6. 客户端响应
    • 客户端收到响应,流程结束。

实现代码

模拟一个配置中心的配置监听实现

客户端代码:发送长轮询请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Slf4j
public class ConfigClientWorker {
private final CloseableHttpClient httpClient;
private final ScheduledExecutorService executorService;
public ConfigClientWorker(String url, String dataId) {
this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName("client.worker.executor-%d");
thread.setDaemon(true);
return thread;
});
// httpClient 客户端超时时间要大于长轮询约定的超时时间
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(70000).build();
this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
executorService.execute(new LongPollingRunnable(url, dataId));
}
class LongPollingRunnable implements Runnable {
private final String url;
private final String dataId;
public LongPollingRunnable(String url, String dataId) {
this.url = url;
this.dataId = dataId;
}

@SneakyThrows
@Override
public void run() {
String endpoint = url + "?dataId=" + dataId;
log.info("endpoint: {}", endpoint);
HttpGet request = new HttpGet(endpoint);
CloseableHttpResponse response = httpClient.execute(request);
switch (response.getStatusLine().getStatusCode()) {
case 200: {
BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
.getContent()));
StringBuilder result = new StringBuilder();
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
response.close();
String configInfo = result.toString();
log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
break;
}
// 304 响应码标记配置未变更
case 304: {
log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
break;
}
default: {
throw new RuntimeException("unExcepted HTTP status code");
}
}
executorService.execute(this);
}
}

public static void main(String[] args) throws IOException {
new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
System.in.read();
}
}

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@RestController
@Slf4j
@SpringBootApplication
public class ConfigServer {

@Data
private static class AsyncTask {
// 长轮询请求的上下文,包含请求和响应体
private AsyncContext asyncContext;
// 超时标记
private boolean timeout;

public AsyncTask(AsyncContext asyncContext, boolean timeout) {
this.asyncContext = asyncContext;
this.timeout = timeout;
}
}

// guava 提供的多值 Map,一个 key 可以对应多个 value
private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());

private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
.build();
private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);

// 配置监听接入点
@RequestMapping("/listener")
public void addListener(HttpServletRequest request, HttpServletResponse response) {

String dataId = request.getParameter("dataId");

// 开启异步!!!
AsyncContext asyncContext = request.startAsync(request, response);
AsyncTask asyncTask = new AsyncTask(asyncContext, true);

// 维护 dataId 和异步请求上下文的关联
dataIdContext.put(dataId, asyncTask);

// 启动定时器,60s 后写入 304 响应
timeoutChecker.schedule(() -> {
if (asyncTask.isTimeout()) {
dataIdContext.remove(dataId, asyncTask);
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
// 标志此次异步线程完成结束!!!
asyncContext.complete();
}
}, 60000, TimeUnit.MILLISECONDS);
}

// 配置发布接入点
@RequestMapping("/publishConfig")
@SneakyThrows
public String publishConfig(String dataId, String configInfo) {
log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
for (AsyncTask asyncTask : asyncTasks) {
asyncTask.setTimeout(false);
HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(configInfo);
asyncTask.getAsyncContext().complete();
}
return "success";
}

public static void main(String[] args) {
SpringApplication.run(ConfigServer.class, args);
}
}