java协程框架,实例介绍Java基于quasar实现协程池

文章主要介绍了java的协程框架,并且小编用实例的形式介绍了java协程框架运行耗时,与quasar和kotlin中的协程对比分析。

文章给大家带来了关于java的相关知识,其中主要整理了基于quasar实现协程池的相关问题,一个线程可以多个协程,一个进程也可以单独拥有多个协程,线程进程都是同步机制,而协程则是异步,下面一起来看一下。

业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)

一个线程可以多个协程,一个进程也可以单独拥有多个协程。

线程进程都是同步机制,而协程则是异步。

协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。

线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。

协程并不是取代线程,而且抽象于线程之上,线程是被分割的CPU资源,协程是组织好的代码流程,协程需要线程来承载运行,线程是协程的资源,但协程不会直接使用线程,协程直接利用的是执行器(Interceptor),执行器可以关联任意线程或线程池,可以使当前线程,UI线程,或新建新程.。

线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。

实例介绍Java基于quasar实现协程池
实例介绍Java基于quasar实现协程池

废话不多说,直接上代码:

导入包:

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-core</artifactId>
    <version>0.7.9</version>
    <classifier>jdk8</classifier>
</dependency>
WorkTools工具类:
package com.example.ai;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;

import java.util.concurrent.ArrayBlockingQueue;

public class WorkTools {

    //协程池中默认协程的个数为5
    private static int WORK_NUM = 5;

    //队列默认任务为100
    private static int TASK_COUNT = 100;

    //工做协程数组
    private Fiber[] workThreads;

    //等待队列
    private final ArrayBlockingQueue<SuspendableRunnable> taskQueue;

    //用户在构造这个协程池时,但愿启动的协程数
    private final int workerNum;

    //构造方法:建立具备默认协程个数的协程池
    public WorkTools() {

        this(WORK_NUM,TASK_COUNT);
    }


    //建立协程池,workNum为协程池中工做协程的个数
    public WorkTools(int workerNum, int taskCount) {

        if (workerNum <= 0) {

            workerNum = WORK_NUM;

        }

        if (taskCount <= 0) {

            taskCount = TASK_COUNT;

        }

        this.workerNum = workerNum;

        taskQueue = new ArrayBlockingQueue(taskCount);

        workThreads = new Fiber[workerNum];

        for (int i = 0; i < workerNum; i++) {

            int finalI = i;

            workThreads[i] = new Fiber<>(new SuspendableRunnable() {

                @Override

                public void run() throws SuspendExecution, InterruptedException {

                    SuspendableRunnable runnable = null;

                    while (true){

                        try{

                            //取任务,没有则阻塞。
                            runnable = taskQueue.take();

                        }catch (Exception e){

                            System.out.println(e.getMessage());

                        }

                        //存在任务则运行。
                        if(runnable != null){

                            runnable.run();

                        }

 

                        runnable = null;

                    }

                }

            });  //new一个工做协程

 

            workThreads[i].start();  //启动工做协程

 

        }

        Runtime.getRuntime().availableProcessors();

    }

    //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
    public void execute(SuspendableRunnable task) {

        try {

            taskQueue.put(task);   //put:阻塞接口的插入

        } catch (Exception e) {

            // TODO: handle exception

            System.out.println("阻塞");

        }

    }

    //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
    public void destory() {

        //工做协程中止工做,且置为null

        System.out.println("ready close thread...");

        for (int i = 0; i < workerNum; i++) {

 

            workThreads[i] = null; //help gc

        }

        taskQueue.clear();  //清空等待队列

    }

    //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
    @Override

    public String toString() {

        return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();

    }
}

测试代码:

package com.example.ai;
import co.paralleluniverse.strands.SuspendableRunnable;

import lombok.SneakyThrows;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.CountDownLatch;

@SpringBootApplication

public class AiApplication {

    @SneakyThrows

    public static void main(String[] args) {

        //等待协程任务完毕后再结束主线程

        CountDownLatch cdl = new CountDownLatch(50);

        //开启5个协程,50个任务列队。

        WorkTools myThreadPool = new WorkTools(5, 50);

        for (int i = 0; i< 50; i++){

            int finalI = i;

            myThreadPool.execute(new SuspendableRunnable() {

                @Override

                public void run() {

                    System.out.println(finalI);

                    try {

                        //延迟1秒

                        Thread.sleep(1000);

                        cdl.countDown();

                    } catch (InterruptedException e) {

                        System.out.println("阻塞中");

                    }

                }

            });

        }

        //阻塞

        cdl.await();

    }
}
实例介绍Java基于quasar实现协程池
实例介绍Java基于quasar实现协程池

具体代码都有注释了,自行了解。我也是以线程池写法实现。

当前为解决问题:在协程阻塞过程中Fiber类会报阻塞警告,满脸懵逼啊,看着很讨厌。暂时没有办法处理,看各位大神谁有招下方评论提供给下思路。万分感谢~

java协程框架对比

早就听说Go语言开发的服务不用任何架构优化,就可以轻松实现百万级别的qps。这得益于Go语言级别的协程的处理效率。协程不同于线程,线程是操作系统级别的资源,创建线程,调度线程,销毁线程都是重量级别的操作。而且线程的资源有限,在java中大量的不加限制的创建线程非常容易将系统搞垮。接下来要分享的这个开源项目,正是解决了在java中只能使用多线程模型开发高并发应用的窘境,使得java也能像Go语言那样使用协程的语义开发了。

quasar项目地址:https://github.com/puniverse/quasar

quasar周边项目地址:https://github.com/puniverse/comsat

快速体验

添加依赖

<dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.10</version>
</dependency>

注意:目前quasar最高的版本是0.8.0,但是最高版本的只支持jdk11以上

添加java agent

quasar的实现原理是在java加载class前,通过jdk的instrument机制使用asm来修改目标class的字节码来实现的,他标记了协程代码的起始和结束的位置,以及方法需要暂停的位置,每个协程任务统一由FiberScheduler去调度,内部维护了一个或多个ForkJoinPool实例。所以,在运行应用前,需要配置好quasar-core的javaagent地址,在vm参数上加上如下脚本即可:

-javaagent:D:\.m2\repository\co\paralleluniverse\quasar-core\0.7.10\quasar-core-0.7.10.jar

线程VS协程

下面模拟调用某个远程的服务,假设远程服务处理耗时需要1S,这里使用执行阻塞1S来模拟,分别看多线程模型和协程模型调用这个服务10000次所需的耗时

协程代码

public static void main(String[] args) throws Exception{
        CountDownLatch count  = new CountDownLatch(10000);
        StopWatch stopWatch = new StopWatch();stopWatch.start();
        IntStream.range(0,10000).forEach(i-> new Fiber() {
            @Override
            protected String run() throws SuspendExecution, InterruptedException {
                Strand.sleep(1000 );
                count.countDown();
                return  "aa";
            }
        }.start());
        count.await();stopWatch.stop();
        System.out.println("结束了: " + stopWatch.prettyPrint());
    }

耗时情况:

java协程框架对比
java协程框架对比

多线程代码

public static void main(String[] args) throws Exception{
        CountDownLatch count  = new CountDownLatch(10000);
        StopWatch stopWatch = new StopWatch();stopWatch.start();
        ExecutorService executorService = Executors.newCachedThreadPool();
        IntStream.range(0,10000).forEach(i-> executorService.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException ex) { }
            count.countDown();
        }));
        count.await();stopWatch.stop();
        System.out.println("结束了: " + stopWatch.prettyPrint());
    }

耗时情况

java协程框架对比
java协程框架对比

协程完胜

可以看到上面的结果,在对比访问一个耗时1s的服务10000次时,协程只需要2秒多,而多线程模型需要4秒多,时效相差了一倍。而且上面多线程编程时,并没有指定线程池的大小,在实际开发中是绝不允许的。一般我们会设置一个固定大小的线程池,因为线程资源是宝贵,线程多了费内存还会带来线程切换的开销。上面的场景在设置200个固定大小线程池时。结果也是可预见的达到了50多秒。这个结果足以证明协程编程ko线程编程了。而且在qps越大时,线程处理的效率和协程的差距就约明显,缩小差距的唯一方式就是增加线程数,而这带来的影响就是内存消耗激增。而反观协程,基于固定的几个线程调度,可以轻松实现百万级的协程处理,而且内存稳稳的。

后记

最后,博主以为Quasar只是一个框架层面的东西,所以就又去看了下同样是jvm语言的kotlin的协程。他的语言更简洁,可以直接和java混合使用。跑上面这种实例只需要1秒多。

fun main() {
    val count = CountDownLatch(10000)
    val stopWatch = StopWatch()
    stopWatch.start()
    IntStream.range(0,10000).forEach {
        GlobalScope.launch {
            delay(1000L)
            println(Thread.currentThread().name + "->"+ it)
            count.countDown()
        }
    }
    count.await()
    stopWatch.stop()
    println("结束了: " + stopWatch.prettyPrint())
}

当博主看到这个结果的时候,有种震惊的赶脚,kotlin的同步模型牛逼呀,瞬时感觉到发现了java里的骚操作了,可以使用kotlin的协程来代替java中的多线程操作。因为他们两个混合开发毫无压力。如果行的通,那就太爽了。所以就有下面这个kotlin协程实现的代码:

@Service
class KotlinAsyncService(private val weatherService: GetWeatherService,private val demoApplication: DemoApplication){
    val weatherUrl = "http://localhost:8080/demo/mockWeatherApi?city="
    fun getHuNanWeather(): JSONObject{
        val result = JSONObject()
        val count = CountDownLatch(demoApplication.weatherContext.size)
        for (city in demoApplication.weatherContext){
            val url = weatherUrl + city.key
            GlobalScope.launch {
                result[city.key.toString()] = weatherService.get(url)
                count.countDown()
            }
        }
        count.await()
        return result
    }
}

现实是,当我使用协程替换掉我java多线程写的一个多线程汇聚多个http接口的结果的接口时,通过ab压测他们两个的性能并没有很大的变化,最后了解到主要原因是这个时候,在协程里发起一个http的请求时,涉及到操作系统层面的socketio操作,io操作是阻塞的,协程的并发也就变成了调度协程的几个线程的并发了。而且当我把同样的代码放到Quasar中的时候,Quasar直接抛io异常了,说明Quasar还并不能轻松支持这个场景。那为什么上面的测试结果差距这么大呢,是因为我错误的把协程实现里的阻塞等同于线程的阻塞。协程里的delay挂起函数,会立马释放线程到线程池,但是当真正的io阻塞的时候也就和真正的线程sleep一样了,并没有释放当前的线程。所以这些对比都没有太大的意义。

总结:以上就是关于java协程框架,与实例介绍Java基于quasar实现协程池,包括了java协程框架quasar和kotlin中的协程对比分析的详细内容,更多关于java框架quasar和kotlin协程对比的资料请关注钦钦技术栈。

版权声明:本文(即:原文链接:https://www.qin1qin.com/catagory/637/)内容由互联网用户自发投稿贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 630367839@qq.com 举报,一经查实,本站将立刻删除。

(0)
上一篇 2022-06-30 1:07:20
下一篇 2022-06-30 4:06:37

软件定制开发公司

相关阅读

发表回复

登录后才能评论
通知:禁止投稿所有关于虚拟货币,币圈类相关文章,发现立即永久封锁账户ID!