※当サイトの記事には、広告・プロモーションが含まれます。

RxJavaって?Reactive Extensions for the JVM(Java Virtual Machine)ってことみたい

f:id:ts0818:20210428220434j:plain

www.ibm.com

www.ibm.com

gan-mag.com

光免疫療法は、2020年9月に頭頸部がん(顔や首の周りにできるがんの総称)を対象に、世界に先駆けて日本で承認されました。

世界初のがん光免疫療法が国内で承認 | 注目の治療・研究 | 「がん治療」新時代

なお、1回の治療にかかる医療費は、薬剤費、装置代、手術費などを含め600万円程度とされていますが、保険診療のため、患者さんの自己負担は抑えられています。

世界初のがん光免疫療法が国内で承認 | 注目の治療・研究 | 「がん治療」新時代

⇧ まじか...、癌が治る時代がやって来るかもしれないなんて、全人類が歓喜の涙ですね、どうもボクです。

というわけで、今回はJavaについてです。

レッツトライ~。

 

ReactiveXって?

「RxJava」ってものは「ReactiveX」ってものを元にしてるらしいので、「ReactiveX」について確認してみますか。

reactivex.io

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

http://reactivex.io/intro.html

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

http://reactivex.io/intro.html

⇧「observable sequences」でプログラミングされたライブラリですと。

「Observable」って何よ? 

reactivex.io

In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Observable to emit objects, but instead it creates a sentry in the form of an observer that stands ready to react appropriately at whatever future time the Observable does so.

http://reactivex.io/documentation/observable.html

⇧ ドキュメントによると、「In ReactiveX an observer subscribes to an Observable.」って言ってるので、「ReactiveX」を実現するための一要素が「Observable」って位置づけだと思うんだけど、いまいちよう分からんね...

そして気になるのは、図には「observer」の説明が一切ないのだが...

Wikipediaを見ろ、と言ってるみたいなので、見てみた。

The observer pattern is a software design pattern in which an object, named the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.

https://en.wikipedia.org/wiki/Observer_pattern

⇧ というか、「subject」とかまた知らんものがでてきたやんけ~!

ネットの情報を確認したところ、

⇧ なるほど、「ReactiveX」的に「subject」が「Observable」になるってことなのね、「ReactiveX」さんよ、紛らわしいことしてくれるね、本当に...。 

で、シーケンス的には、

⇧ 上記のような感じになるようです。

「Observer」が起点っぽい書き方なんですが、当然のことながら、「subject(Observable)」が外部からデータを受け取っている部分については端折られてるわけですね。

なので、全体像としては、

⇧ 上図のような感じになるかと。(上図は「Observer」からの通知が無くなってるけど...完全な情報が無いな~...)

「Client」から「Observable」がデータとかを受け取って、「Observer」が「Observable」に受け入れ準備が整った旨を通知してから、「Observable」は「Observer」に対して「データ」を渡していく。

そして、「Observer」は受け取った「データ」を処理して、その間「Observable」は別の事を処理する感じですかね。

「Observer」は「データ」の処理が終わったら、「Observable」に通知して、通知を受け取った「Observer」は「Client」に何かしらのレスポンスを返す、みたいな流れになるんですかね。 

超ザックリ言うと、「ReactiveX」は「非同期処理」を実現する「ライブラリ」の実現を目指す「プロジェクト」ってことなんですかね。

 

RxJavaはReactiveXの一プロジェクトらしい?

GitHub」を見た感じでは、 

github.com

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

https://github.com/ReactiveX/RxJava

⇧ 「ReactiveX/RxJava」っていう「リポジトリ名」からして、「ReactiveX」の一プロジェクトという位置付けで良さ気っぽい感じですかね。 

「ReactiveX」のリポジトリを確認した感じだと、

github.com

⇧ 2021年4月29日(木)時点で、42リポジトリとあるので、42のプログラミング言語に対応してるってことになるのかしら? 

 

RxJavaには「Observable」の他に「Flowable」が存在する

で、無茶苦茶ややこしいことに、「ReactiveX」だと「Observable」が云々かんぬんって話だったのに、「RxJava」 には、何と!

azunobu.hatenablog.com

Reactive Streamsの概念では、データを生産し通知する役割を担う生産者と通知されてきたデータを受け取って処理を行う消費者がいます。消費者が生産者を購読することで生産者からの通知を受け取ることができ、データを受け取ることが可能になります。

RxJava - Flowable / Observable と Subscriber / Observer について - 22時に寝ようと思って2時に寝る。

RxJava では、この両者の関係が2つ存在します。

  生産者 消費者
Reactive Streams 対応 Flowable Subscriber
Reactive Streams 非対応 Observable Observer

大きな特徴として、Reactive Streams 非対応の Observable / Observer では backpressure がありません。前回、記事でも説明した backpressure は以下のようなことを実現します。

RxJava - Flowable / Observable と Subscriber / Observer について - 22時に寝ようと思って2時に寝る。

⇧「Observable」の他にも「Flowable」ってのが存在するんだそうな...

というか「Reactive Streams」って何すか?

 

Reactive Streamsって?

Wikipediaさんに聞いてみる。

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

https://en.wikipedia.org/wiki/Reactive_Streams

Reactive Streams started as an initiative in late 2013 between engineers at NetflixPivotal and Lightbend. Some of the earliest discussions began in 2013 between the Play and Akka teams at Lightbend.

Lightbend is one of the main contributors of Reactive Streams. Other contributors include Red HatOracleTwitter and spray.io.

https://en.wikipedia.org/wiki/Reactive_Streams

⇧ どうも、発祥は2013年ってことになるのかな?

The specification developed with the intent of future inclusion in the official Java standard library, if proven successful and adopted by enough libraries and vendors.

https://en.wikipedia.org/wiki/Reactive_Streams

Reactive Streams were proposed to become part of Java 9 by Doug Lea, leader of JSR 166 as a new Flow class that would include the interfaces currently provided by Reactive Streams.

https://en.wikipedia.org/wiki/Reactive_Streams

After a successful 1.0 release of Reactive Streams and growing adoption, the proposal was accepted and Reactive Streams was included in JDK9 via the JEP-266.

https://en.wikipedia.org/wiki/Reactive_Streams

⇧はい、出ました、「Java 9」で正式に採用されたもの?らしいね...

On April 30, 2015 version 1.0.0 of Reactive Streams for the JVM was released, including Java API, a textual specification, a TCK and implementation examples. It comes with a multitude of compliant implementations verified by the TCK for 1.0.0, listed in alphabetical order:

https://en.wikipedia.org/wiki/Reactive_Streams

⇧ 2015年に「Java API」に含まれたそうな。

一応、公式っぽいサイトによると、

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

https://www.reactive-streams.org/

⇧「JVM and JavaScript」環境を対象、って言ってますね。

で、「Observer pattern」との違いって?

codeburst.io

⇧ 上図のような違いがあるようです。

よく分からんのだけど、「RxJava」の話に限定すると、

  • Reactive Streams対応
    • Flowable
      Publish/Subcribe Pattern
  • Reactive Streams非対応
    • Observable
      Observer Pattern

って感じになるのかね?

ちなみに、「Reactive Streams」については、

github.com

⇧ 2021年4月29日(木)時点で、9リポジトリということで「ReactiveX」に比べると対応されてるプログラミング言語はまだそれほどない模様。

 

実際に使ってみた

というわけで、実際にどんなもんか動かしてみますか。

codezine.jp

hydrakecat.hatenablog.jp

pppurple.hatenablog.com

qiita.com

⇧ 上記サイト様を参考にさせていただきました。

注意すべき点としては、 「RxJava」自体が安定してないらしく、実装が頻繁に変わってるっぽいというところですかね。

Eclipseで「Gradleプロジェクト」を作成して、ファイルを編集していきます。

f:id:ts0818:20210428220704p:plain

■/Java_one_hundred_knock_Rx/build.gradle

/*
 * This file was generated by the Gradle 'init' task.
 *
 * This generated file contains a sample Java Library project to get you started.
 * For more details take a look at the Java Libraries chapter in the Gradle
 * User Manual available at https://docs.gradle.org/6.3/userguide/java_library_plugin.html
 */

plugins {
    // Apply the java-library plugin to add support for Java Library
    id 'java-library'
}

repositories {
    // Use jcenter for resolving dependencies.
    // You can declare any Maven/Ivy/file repository here.
    jcenter()
}

dependencies {
    // This dependency is exported to consumers, that is to say found on their compile classpath.
    api 'org.apache.commons:commons-math3:3.6.1'

    // This dependency is used internally, and not exposed to consumers on their own compile classpath.
    implementation 'com.google.guava:guava:28.2-jre'

    // https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava
    implementation group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.21'

    // https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams
    implementation group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.3'

    // Use JUnit test framework
    testImplementation 'junit:junit:4.12'
}

⇧ おそらく、色の付いたライブラリが読み込めていれば「RxJava」を使うには問題ないかと。

■/Java_one_hundred_knock_Rx/src/main/java/Java_one_hundred_knock_Rx/app/Fibonacci.java

package Java_one_hundred_knock_Rx.app;

import java.math.BigInteger;

import io.reactivex.Observable;
import io.reactivex.subjects.BehaviorSubject;

public class Fibonacci {

  private final BehaviorSubject<BigInteger> value = BehaviorSubject.create();

  public Observable<BigInteger> getValue() {
    return value.hide();
  }

  public void setValue(BigInteger num) {
    value.onNext(num);
  }

  public void setSource(Observable<BigInteger> observable0, Observable<BigInteger> observable1, BigInteger limit) {
    Observable.zip(observable0, observable1, (i, j) -> i .add(j) )
    .filter(i -> i.compareTo(limit) == -1 )
    .subscribe(i -> value.onNext(i));
  }
}

■/Java_one_hundred_knock_Rx/src/main/java/Java_one_hundred_knock_Rx/app/App.java

package Java_one_hundred_knock_Rx.app;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;

public class App {

  public static void main(String[] args) {
    // TODO 自動生成されたメソッド・スタブ
    //BigInteger input =  new BigDecimal(Math.pow(2, 1023)).toBigInteger() ;
    BigInteger input =  new BigDecimal(Math.pow(2, 1023)).pow(2, MathContext.UNLIMITED).toBigInteger() ;

    Fibonacci a = new Fibonacci();
    Fibonacci b = new Fibonacci();
    Fibonacci c = new Fibonacci();

    a.getValue().subscribe(i -> System.out.print(i +"\t"));
    b.getValue().subscribe(i -> System.out.print(i +"\t"));
    c.getValue().subscribe(i -> System.out.print(i +"\t"));

    a.setSource(b.getValue(), c.getValue(), input);
    b.setSource(c.getValue(), a.getValue(), input);
    c.setSource(a.getValue(), b.getValue(), input);

    a.setValue(BigInteger.ZERO);
    b.setValue(BigInteger.ONE);
  }
}

⇧ で実行してみた。

3875個の「フィボナッチ数」が得られるみたいです。

全然関係ないけども、BigDecimalでnewするときの引数に設定できる数値の大きさの上限を確認してみたところ、 

 2^{1023} が限界みたいで、

8079251517827751825178719172167487990111025667428871008032586356881163784716972723299300352880728365922179490230474504873529889787622730273772038096612070780157719341825249022937549437597413026699014409596016892069198054660654939040459523584619042617645411463009076260721893972885266452151888099780982596380478583347417085605171243696641142373714044008831580514519451414832756548177115078537564648216044279181485900929615464339399587788075411476100924403308321807806781421177705052431289275431732830867419635645164174483761499317088249659553881291597359333885900533858307401161329619651238037048388963402764899057664

⇧ ってな値になりました。逆に言うと、普通のパソコン使ってるような環境だと、この数値までしか「フィボナッチ数」が作り出せないってことになるのかね?

ちなみに、BigDecimalでnewする時の2の乗数の限界は  2^{1023} なんだけど、そこからさらに数を大きくする?ことはできるっぽいのかな...

■「フィボナッチ数」の数:1940個 できる上限の数値

BigInteger input = new BigDecimal(Math.pow(2, 1023)).toBigInteger();

■「フィボナッチ数」の数:3875個 できる上限の数値

BigInteger input = new BigDecimal(Math.pow(2, 1023)).pow(2, MathContext.UNLIMITED).toBigInteger();    

⇧ なんか、「フィボナッチ数」の数がだいぶ変わってきたのよね...

そして、 2^{1024} だと「NumberFormatException」が起こるみたい。

f:id:ts0818:20210428210951p:plain

怒られた箇所は、 

    public BigDecimal(double val, MathContext mc) {
        if (Double.isInfinite(val) || Double.isNaN(val))
            throw new NumberFormatException("Infinite or NaN");
 

ってなってたのですが、「BigDecimal」って「Double」型で判定してたんか~い!っていうツッコミを入れたくなったけど、処理できる範囲を超えてるてことみたいね。

というか、「NumberFormatException」ってエラーにされると、ややこしいから止めて欲しいよね...

まぁ、エラーメッセージが、

f:id:ts0818:20210428211906p:plain

⇧「Exception in thread "main" java.lang.NumberFormatException: Infinite or NaN」ってなってるけども、「数値」を指定してるのに「数値じゃないフォーマット」で怒られるのは納得がいかんから、エラーメッセージをどうにかして欲しいっすな。

というわけで、最後は「RxJava」とは全く関係ない話で終わるというね...しかも「Flowable」使ってないから「Reactive Streams」は試せておりませんと...

結局のところ、Javaで扱える数値の大きさの上限ってものがどれぐらいになるのかも分からんかったし...

いや~、モヤモヤ感が半端ないよね...

今回はこのへんで。