文章给大家带来了关于java的相关知识,其中主要整理了基于quasar实现协程池的相关问题,一个线程可以多个协程,一个进程也可以单独拥有多个协程,线程进程都是同步机制,而协程则是异步,下面一起来看一下。
业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)
一个线程可以多个协程,一个进程也可以单独拥有多个协程。
线程进程都是同步机制,而协程则是异步。
协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。
线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。
协程并不是取代线程,而且抽象于线程之上,线程是被分割的CPU资源,协程是组织好的代码流程,协程需要线程来承载运行,线程是协程的资源,但协程不会直接使用线程,协程直接利用的是执行器(Interceptor),执行器可以关联任意线程或线程池,可以使当前线程,UI线程,或新建新程.。
线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。

废话不多说,直接上代码:
导入包:
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.7.9</version>
<classifier>jdk8</classifier>
</dependency>
WorkTools工具类:
package com.example.ai;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;
import java.util.concurrent.ArrayBlockingQueue;
public class WorkTools {
//协程池中默认协程的个数为5
private static int WORK_NUM = 5;
//队列默认任务为100
private static int TASK_COUNT = 100;
//工做协程数组
private Fiber[] workThreads;
//等待队列
private final ArrayBlockingQueue<SuspendableRunnable> taskQueue;
//用户在构造这个协程池时,但愿启动的协程数
private final int workerNum;
//构造方法:建立具备默认协程个数的协程池
public WorkTools() {
this(WORK_NUM,TASK_COUNT);
}
//建立协程池,workNum为协程池中工做协程的个数
public WorkTools(int workerNum, int taskCount) {
if (workerNum <= 0) {
workerNum = WORK_NUM;
}
if (taskCount <= 0) {
taskCount = TASK_COUNT;
}
this.workerNum = workerNum;
taskQueue = new ArrayBlockingQueue(taskCount);
workThreads = new Fiber[workerNum];
for (int i = 0; i < workerNum; i++) {
int finalI = i;
workThreads[i] = new Fiber<>(new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
SuspendableRunnable runnable = null;
while (true){
try{
//取任务,没有则阻塞。
runnable = taskQueue.take();
}catch (Exception e){
System.out.println(e.getMessage());
}
//存在任务则运行。
if(runnable != null){
runnable.run();
}
runnable = null;
}
}
}); //new一个工做协程
workThreads[i].start(); //启动工做协程
}
Runtime.getRuntime().availableProcessors();
}
//执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
public void execute(SuspendableRunnable task) {
try {
taskQueue.put(task); //put:阻塞接口的插入
} catch (Exception e) {
// TODO: handle exception
System.out.println("阻塞");
}
}
//销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
public void destory() {
//工做协程中止工做,且置为null
System.out.println("ready close thread...");
for (int i = 0; i < workerNum; i++) {
workThreads[i] = null; //help gc
}
taskQueue.clear(); //清空等待队列
}
//覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
@Override
public String toString() {
return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();
}
}
测试代码:
package com.example.ai;
import co.paralleluniverse.strands.SuspendableRunnable;
import lombok.SneakyThrows;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.concurrent.CountDownLatch;
@SpringBootApplication
public class AiApplication {
@SneakyThrows
public static void main(String[] args) {
//等待协程任务完毕后再结束主线程
CountDownLatch cdl = new CountDownLatch(50);
//开启5个协程,50个任务列队。
WorkTools myThreadPool = new WorkTools(5, 50);
for (int i = 0; i< 50; i++){
int finalI = i;
myThreadPool.execute(new SuspendableRunnable() {
@Override
public void run() {
System.out.println(finalI);
try {
//延迟1秒
Thread.sleep(1000);
cdl.countDown();
} catch (InterruptedException e) {
System.out.println("阻塞中");
}
}
});
}
//阻塞
cdl.await();
}
}

具体代码都有注释了,自行了解。我也是以线程池写法实现。
当前为解决问题:在协程阻塞过程中Fiber类会报阻塞警告,满脸懵逼啊,看着很讨厌。暂时没有办法处理,看各位大神谁有招下方评论提供给下思路。万分感谢~
java协程框架对比
早就听说Go语言开发的服务不用任何架构优化,就可以轻松实现百万级别的qps。这得益于Go语言级别的协程的处理效率。协程不同于线程,线程是操作系统级别的资源,创建线程,调度线程,销毁线程都是重量级别的操作。而且线程的资源有限,在java中大量的不加限制的创建线程非常容易将系统搞垮。接下来要分享的这个开源项目,正是解决了在java中只能使用多线程模型开发高并发应用的窘境,使得java也能像Go语言那样使用协程的语义开发了。
quasar项目地址:https://github.com/puniverse/quasar
quasar周边项目地址:https://github.com/puniverse/comsat
快速体验
添加依赖
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.7.10</version>
</dependency>
注意:目前quasar最高的版本是0.8.0,但是最高版本的只支持jdk11以上
添加java agent
quasar的实现原理是在java加载class前,通过jdk的instrument机制使用asm来修改目标class的字节码来实现的,他标记了协程代码的起始和结束的位置,以及方法需要暂停的位置,每个协程任务统一由FiberScheduler去调度,内部维护了一个或多个ForkJoinPool实例。所以,在运行应用前,需要配置好quasar-core的javaagent地址,在vm参数上加上如下脚本即可:
-javaagent:D:\.m2\repository\co\paralleluniverse\quasar-core\0.7.10\quasar-core-0.7.10.jar
线程VS协程
下面模拟调用某个远程的服务,假设远程服务处理耗时需要1S,这里使用执行阻塞1S来模拟,分别看多线程模型和协程模型调用这个服务10000次所需的耗时
协程代码
public static void main(String[] args) throws Exception{
CountDownLatch count = new CountDownLatch(10000);
StopWatch stopWatch = new StopWatch();stopWatch.start();
IntStream.range(0,10000).forEach(i-> new Fiber() {
@Override
protected String run() throws SuspendExecution, InterruptedException {
Strand.sleep(1000 );
count.countDown();
return "aa";
}
}.start());
count.await();stopWatch.stop();
System.out.println("结束了: " + stopWatch.prettyPrint());
}
耗时情况:

多线程代码
public static void main(String[] args) throws Exception{
CountDownLatch count = new CountDownLatch(10000);
StopWatch stopWatch = new StopWatch();stopWatch.start();
ExecutorService executorService = Executors.newCachedThreadPool();
IntStream.range(0,10000).forEach(i-> executorService.submit(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) { }
count.countDown();
}));
count.await();stopWatch.stop();
System.out.println("结束了: " + stopWatch.prettyPrint());
}
耗时情况

协程完胜
可以看到上面的结果,在对比访问一个耗时1s的服务10000次时,协程只需要2秒多,而多线程模型需要4秒多,时效相差了一倍。而且上面多线程编程时,并没有指定线程池的大小,在实际开发中是绝不允许的。一般我们会设置一个固定大小的线程池,因为线程资源是宝贵,线程多了费内存还会带来线程切换的开销。上面的场景在设置200个固定大小线程池时。结果也是可预见的达到了50多秒。这个结果足以证明协程编程ko线程编程了。而且在qps越大时,线程处理的效率和协程的差距就约明显,缩小差距的唯一方式就是增加线程数,而这带来的影响就是内存消耗激增。而反观协程,基于固定的几个线程调度,可以轻松实现百万级的协程处理,而且内存稳稳的。
后记
最后,博主以为Quasar只是一个框架层面的东西,所以就又去看了下同样是jvm语言的kotlin的协程。他的语言更简洁,可以直接和java混合使用。跑上面这种实例只需要1秒多。
fun main() {
val count = CountDownLatch(10000)
val stopWatch = StopWatch()
stopWatch.start()
IntStream.range(0,10000).forEach {
GlobalScope.launch {
delay(1000L)
println(Thread.currentThread().name + "->"+ it)
count.countDown()
}
}
count.await()
stopWatch.stop()
println("结束了: " + stopWatch.prettyPrint())
}
当博主看到这个结果的时候,有种震惊的赶脚,kotlin的同步模型牛逼呀,瞬时感觉到发现了java里的骚操作了,可以使用kotlin的协程来代替java中的多线程操作。因为他们两个混合开发毫无压力。如果行的通,那就太爽了。所以就有下面这个kotlin协程实现的代码:
@Service
class KotlinAsyncService(private val weatherService: GetWeatherService,private val demoApplication: DemoApplication){
val weatherUrl = "http://localhost:8080/demo/mockWeatherApi?city="
fun getHuNanWeather(): JSONObject{
val result = JSONObject()
val count = CountDownLatch(demoApplication.weatherContext.size)
for (city in demoApplication.weatherContext){
val url = weatherUrl + city.key
GlobalScope.launch {
result[city.key.toString()] = weatherService.get(url)
count.countDown()
}
}
count.await()
return result
}
}
现实是,当我使用协程替换掉我java多线程写的一个多线程汇聚多个http接口的结果的接口时,通过ab压测他们两个的性能并没有很大的变化,最后了解到主要原因是这个时候,在协程里发起一个http的请求时,涉及到操作系统层面的socketio操作,io操作是阻塞的,协程的并发也就变成了调度协程的几个线程的并发了。而且当我把同样的代码放到Quasar中的时候,Quasar直接抛io异常了,说明Quasar还并不能轻松支持这个场景。那为什么上面的测试结果差距这么大呢,是因为我错误的把协程实现里的阻塞等同于线程的阻塞。协程里的delay挂起函数,会立马释放线程到线程池,但是当真正的io阻塞的时候也就和真正的线程sleep一样了,并没有释放当前的线程。所以这些对比都没有太大的意义。
总结:以上就是关于java协程框架,与实例介绍Java基于quasar实现协程池,包括了java协程框架quasar和kotlin中的协程对比分析的详细内容,更多关于java框架quasar和kotlin协程对比的资料请关注钦钦技术栈。
版权声明:本文(即:原文链接:https://www.qin1qin.com/catagory/637/)内容由互联网用户自发投稿贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 630367839@qq.com 举报,一经查实,本站将立刻删除。