长轮询
概述
长轮询(Long Polling)是一种客户端从服务器获取数据的技术。它属于一种服务器推送技术(Server Push),旨在减少服务器和客户端之间的延迟即保证实时性。与传统的轮询不同,长轮询通过让服务器保持HTTP连接一段时间来等待有新数据时再响应客户端请求,从而减少频繁的请求和响应。
普通轮询(Polling)
- 客户端定时向服务器发送请求,询问是否有新数据。
- 每次请求无论是否有新数据,服务器都会立即响应。
- 频繁的请求会增加服务器的负担和网络流量。
长轮询(Long Polling)
- 客户端发送请求后,服务器不会立即响应,而是保持连接直到有新数据或达到超时。
- 当有新数据时,服务器立即响应并返回数据;如果超时则返回空响应。
- 减少了请求次数,从而降低服务器负担和网络流量。
优缺点
优点
- 减少了客户端和服务器之间的频繁通信,降低了服务器负担。
- 相比普通轮询,长轮询能够更及时地向客户端推送数据,减少延迟。
- 实现相对简单,不需要额外的协议支持。
缺点
- 每个长轮询请求会保持一个HTTP连接,占用服务器资源,如果有大量的并发请求,可能导致服务器连接耗尽。
- 长时间保持连接可能对防火墙和代理服务器不友好,某些环境下可能不稳定。
- 实时性比不上WebSocket等更先进的技术。
适用场景
长轮询适用于以下场景:
- 数据更新不频繁,但需要及时性较高的应用,如社交网络的消息提醒、新闻推送。
- 不需要高实时性但仍需要减少轮询频率的应用。
- 不适合使用WebSocket等更复杂技术的简单应用场景。
- 初始实现和部署需要快速见效的场景。
例如微信扫描、支付、配置中心配置监听等场景
具体实现
同步阻塞方式
同步阻塞方式,利用同步队列实现,但是会导致来一个请求占用一个线程,资源利用率极其低效
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @RestController public class SyncLongPollingController {
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
public void produceMessage(String message) { messageQueue.offer(message); }
@GetMapping("/sync-long-polling") public String longPolling() { try { String message = messageQueue.poll(30, TimeUnit.SECONDS); if (message == null) { return "No new messages"; } else { return "New message: " + message; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return "Error occurred"; } } }
|
异步方式
长轮询挂起请求,等待新数据更新后再响应的处理方式虽然能减少浏览器的请求次数,并带来即时性,但是如果使用同步处理请求的方式,挂起请求则代表了线程的阻塞,有多少长轮询请求未响应就代表要阻塞多少个Servlet线程,这显然是不合理的,所以长轮询必须使用异步处理的方式,而Servlet 3.0开始已经支持了异步处理

大致流程:
- 客户端请求:
- 主线程处理:
- 请求进入主线程池(Tomcat容器线程池)。
- 主线程调用
request.startAsync()
启动异步处理,并将请求挂起进入请求池。
- 注册超时任务:
- 将异步请求对象(AsyncContext)注册到
ScheduledThreadPoolExecutor
中,设置超时(如60秒)。
- 在超时时间内,如果没有新数据,将触发超时任务,返回”无新消息”给客户端。
- 守护线程监听:
- 守护线程监听事件队列,当有新消息时,取出新数据。
- 将新消息分配给线程池中的线程执行,并将响应返回给客户端。
- 请求处理:
- 线程池中的线程处理新消息,调用
AsyncContext.complete()
方法,结束异步处理。
- 主线程释放,返回到线程池。
- 客户端响应:
实现代码
模拟一个配置中心的配置监听实现
客户端代码:发送长轮询请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| @Slf4j public class ConfigClientWorker { private final CloseableHttpClient httpClient; private final ScheduledExecutorService executorService; public ConfigClientWorker(String url, String dataId) { this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = new Thread(runnable); thread.setName("client.worker.executor-%d"); thread.setDaemon(true); return thread; }); RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(70000).build(); this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); executorService.execute(new LongPollingRunnable(url, dataId)); } class LongPollingRunnable implements Runnable { private final String url; private final String dataId; public LongPollingRunnable(String url, String dataId) { this.url = url; this.dataId = dataId; } @SneakyThrows @Override public void run() { String endpoint = url + "?dataId=" + dataId; log.info("endpoint: {}", endpoint); HttpGet request = new HttpGet(endpoint); CloseableHttpResponse response = httpClient.execute(request); switch (response.getStatusLine().getStatusCode()) { case 200: { BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity() .getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } response.close(); String configInfo = result.toString(); log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo); break; } case 304: { log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId); break; } default: { throw new RuntimeException("unExcepted HTTP status code"); } } executorService.execute(this); } } public static void main(String[] args) throws IOException { new ConfigClientWorker("http://127.0.0.1:8080/listener", "user"); System.in.read(); } }
|
服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| @RestController @Slf4j @SpringBootApplication public class ConfigServer {
@Data private static class AsyncTask { private AsyncContext asyncContext; private boolean timeout;
public AsyncTask(AsyncContext asyncContext, boolean timeout) { this.asyncContext = asyncContext; this.timeout = timeout; } }
private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d") .build(); private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
@RequestMapping("/listener") public void addListener(HttpServletRequest request, HttpServletResponse response) {
String dataId = request.getParameter("dataId");
AsyncContext asyncContext = request.startAsync(request, response); AsyncTask asyncTask = new AsyncTask(asyncContext, true);
dataIdContext.put(dataId, asyncTask);
timeoutChecker.schedule(() -> { if (asyncTask.isTimeout()) { dataIdContext.remove(dataId, asyncTask); response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); asyncContext.complete(); } }, 60000, TimeUnit.MILLISECONDS); }
@RequestMapping("/publishConfig") @SneakyThrows public String publishConfig(String dataId, String configInfo) { log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo); Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId); for (AsyncTask asyncTask : asyncTasks) { asyncTask.setTimeout(false); HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse(); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().println(configInfo); asyncTask.getAsyncContext().complete(); } return "success"; }
public static void main(String[] args) { SpringApplication.run(ConfigServer.class, args); } }
|