VirtualThread Thread.yield() 从jdk测试里摘抄

var list = new CopyOnWriteArrayList<String>();

        var threadsStarted = new AtomicBoolean();

        var threadA = Thread.ofVirtual().unstarted(() -> {
            while (!threadsStarted.get()) {
                Thread.onSpinWait();
            }

            list.add("A");
            Thread.yield();
            list.add("A");
        });

        var threadB = Thread.ofVirtual().unstarted(() -> {
            list.add("B");
        });


        threadA.start();
        threadB.start();


        threadsStarted.set(true);


        threadA.join();
        threadB.join();
        assert  list.stream().collect(Collectors.joining(",")) == List.of("A", "B", "A").stream().collect(Collectors.joining(","));

ScopedValue,StructuredTaskScope

JEP 429: Scoped Values (Incubator) 代码用的java版本是 openjdk 22-internal ,SOURCE=“.:git:ad34be1f329e”

isBound, get


        // get
        ScopedValue<String> name = ScopedValue.newInstance();
        String result = ScopedValue.getWhere(name, "duke", ()->{
            // 在这个scope里是inbound的
            System.out.println(name.isBound());
            // 所以这个scope里才能get到值
            return name.get();
        });
        System.out.println(result);
        System.out.println(name.isBound());

几个开启scope的方法runWhere,callWhere,getWhere


        ScopedValue<String> v1 = ScopedValue.newInstance();
        ScopedValue.runWhere(v1,"new v1 run",()->{
            System.out.println(v1.get());
        });
        ScopedValue.callWhere(v1,"new v1 call",()->{
            System.out.println(v1.get());
            return v1.get();
        });

        ScopedValue.getWhere(v1,"new v1 get",() ->{
            System.out.println(v1.get());
            return v1.get();
        });

        assert "default" == v1.orElse("default");

        ScopedValue.runWhere(v1,"the",()->{
            assert "the" ==  v1.orElse(null);
        });

scope嵌套

        ScopedValue<String> v1 = ScopedValue.newInstance();
        ScopedValue.runWhere(v1,"v1 leve1",()->{
            assert v1.isBound();
            assert "v1 leve1" == v1.get();
            ScopedValue.runWhere(v1,"v1 leve2",()->{
                assert v1.isBound();
                assert "v1 leve2" == v1.get();
            });
            assert v1.isBound();
            assert "v1 leve1" == v1.get();
        });

多值

        ScopedValue<String> name = ScopedValue.newInstance();
        ScopedValue<Integer> age = ScopedValue.newInstance();
        ScopedValue.where(name,"my name")
                .where(age,22)
                .run(()->{
                    assert name.isBound();
                    assert age.isBound();
                    System.out.println(name.get());
                    System.out.println(age.get());

                });

StructuredTaskScope 的PreviewFeature的版本,与19release的版本略有不同

对了 如果一些没有relase的版本的代码片段在IDEA上无法运行,就直接java XXX 吧,java已经可以直接执行java文件了, 加上--enable-preview --source 22即可

fork with virtual thread


   Set<Thread> threads = ConcurrentHashMap.newKeySet();
    try (var scope = new StructuredTaskScope<Object>("v",
                                    // 通过虚拟线程创建100个fork非常快
                                        Thread.ofVirtual().factory())) {
      for (int i = 0; i < 100; i++) {
        scope.fork(() -> {
          threads.add(Thread.currentThread());
          return null;
        });
      }
      scope.join();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    assert 100 == threads.size();
    assert 100 == threads.stream().filter(t->t.isVirtual()).count();

ShutdownOnSuccess


// 源码处
            if (subtask.state() == Subtask.State.SUCCESS) {
                // task succeeded
                T result = subtask.get();
                Object r = (result != null) ? result : RESULT_NULL;
                if (FIRST_RESULT.compareAndSet(this, null, r)) {
                    // 确认是第一个成功的就shutdown
                    super.shutdown();
                }
            } 
// 比如 

try(var scope = new StructuredTaskScope.ShutdownOnSuccess<>()) {
      StructuredTaskScope.Subtask<Object> f1 = scope.fork(()->{
        return "1";
      });
      StructuredTaskScope.Subtask<Object> f2 = scope.fork(()->{
        TimeUnit.SECONDS.sleep(1);
        return "2";
      });
      System.out.println(f1.state());
      System.out.println(f2.state());
      scope.join();
      System.out.println("join");
      System.out.println(f1.state());
      System.out.println(f2.state());
      // get会报错,因为其中一个成功后其他的已经取消了
//      System.out.println(f1.get());
//      System.out.println(f2.get());

      System.out.println(scope.result());

    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    }

自定义scope handle

public static class MyScopeException extends RuntimeException {}

  public static class MyScope extends StructuredTaskScope<String> {
    private final Collection<String> oks = new ConcurrentLinkedDeque<>();
    private final Collection<Throwable> errs = new ConcurrentLinkedDeque<>();

    @Override
    protected void handleComplete(Subtask<? extends String> subtask) {
      switch (subtask.state()){
        case UNAVAILABLE : throw new IllegalStateException("");
        case SUCCESS : this.oks.add(subtask.get());break;
        case FAILED : this.errs.add(subtask.exception());break;
        default : {}break;
      }
    }

    public MyScopeException errors(){
      MyScopeException exception = new MyScopeException();
      errs.forEach(exception::addSuppressed);
      return exception;
    }

    public String myResult(){
      return oks.stream().findFirst().orElseThrow(this::errors);
    }
  }

使用自定义的scope


try(var scope = new MyScope()) {
      scope.fork(()->{
        TimeUnit.SECONDS.sleep(1);
        return "1";
      });
      scope.fork(()->{
        return "2";
      });

      scope.join();

      System.out.println(scope.myResult());
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }