※当サイトの記事には、広告・プロモーションが含まれます。

Javaの標準APIのWatchServiceを使ってみる。要件によってはMessage Queueの仕組みも使った方が良い気はする

news.mynavi.jp

VS CodeVisual Studio Code)もキャッチアップしてかねばですかね...

フロントエンドまで手が回らんのだけど、

www.publickey1.jp

Bunは2022年7月に登場したソフトウェアです。

公式Webサイトで「Bun is a fast all-in-one JavaScript runtime」と紹介されているように、JavaScriptランタイムとバンドラ、トランスパイラなどが最初から統合され、タスクランナーとして実行できる機能も備えています。

JavaScriptランタイム「Bun」がバージョン1.0に到達へ、9月7日にローンチイベント開催 - Publickey

なによりもBunが急速に注目を集めた理由は、Node.js互換をうたいNode.jsのnpmパッケージがそのままBunでも使えるとしたこと、そして高速性を強調したことでしょう。

JavaScriptランタイム「Bun」がバージョン1.0に到達へ、9月7日にローンチイベント開催 - Publickey

さらに主な開発言語としてZigを採用し、メモリ管理などを含む低レイヤでの実装を実現することで、Node.jsやDenoよりも高速な動作を実現していると説明しています。

JavaScriptランタイム「Bun」がバージョン1.0に到達へ、9月7日にローンチイベント開催 - Publickey

しかし、Node.js互換でありつつ多機能でしかも高速であることをアピールするBunが2022年7月に登場し一気に注目を集めると、Denoは2022年8月にNode.js互換としてnpmパッケージをサポートすることと最速のJavaScriptランタイムになるという方針を発表したのです。

JavaScriptランタイム「Bun」がバージョン1.0に到達へ、9月7日にローンチイベント開催 - Publickey

⇧「Deno」の方針転換のいまさら感が...

ユーザーのことを考慮してこなかった皺寄せが来てる感じですかね。

Javaの標準APIのWatchServiceとは?

Oracleさんの公開しているドキュメントによりますと、

docs.oracle.com

登録されたオブジェクトの変更およびイベントを監視する監視サービスです。たとえば、ファイル・マネージャでは、ファイルが作成または削除されたときにファイル・リストの表示を更新できるように、監視サービスを使ってディレクトリの変更を監視することがあります。

https://docs.oracle.com/javase/jp/8/docs/api/java/nio/file/WatchService.html

registerメソッドを呼び出してWatchableオブジェクトを監視サービスに登録すると、その登録を表すWatchKeyが返されます。オブジェクトのイベントが検出されると、その鍵はsignalledになり、現在signalledになっていない場合は、pollまたはtakeメソッドを呼び出して鍵の取得やイベントの処理を行うコンシューマが取得できるように、監視サービスのキューに入れられます。イベントの処理が完了すると、コンシューマはその鍵のresetメソッドを呼び出して鍵をリセットします。これにより、さらにイベントがあれば、その鍵はsignalledになり、再度キューに入れられるようになります。

https://docs.oracle.com/javase/jp/8/docs/api/java/nio/file/WatchService.html

⇧ とのこと。

事前に、監視したい対象を登録しておく必要がありますと。

いや~、むっちゃ便利な機能がJavaの標準APIで用意されていたんですな、善き哉、善き哉って思ったのですが、

プラットフォームの依存性

ファイル・システムからイベントを監視する実装は、ネイティブ・ファイル・イベント通知機能(使用可能な場合)に直接マップしたり、ネイティブ機能が使用できない場合はポーリングなどの基本メカニズムを使用したりするよう意図されています。その結果、イベントの検出方法、その適用のタイミング、およびその順序が維持されるかどうかに関する詳細の多くは、実装によって大きく異なります。たとえば、監視対象のディレクトリ内のファイルが変更されると、一部の実装では単一のENTRY_MODIFYイベントが発生することがありますが、他の実装では複数のイベントが発生することがあります。有効期間の短いファイル(作成後すぐに削除されるファイルのこと)は、定期的にファイル・システムをポーリングして変更を検出するプリミティブな実装では検出されないことがあります。

https://docs.oracle.com/javase/jp/8/docs/api/java/nio/file/WatchService.html

⇧ まさかのプラットフォームによって挙動が変わってくるかもしれないんだとか...

WatchServiceのAPIの構成について、クラス図を作成してる方がおられて、

⇧ WatchServiceインターフェイスは、

  • LinuxWatchService
  • WindowsWatchService
  • PollingWatchService

のいずれかのクラスによって実装されているらしい。

実際はと言うと、

Linux

github.com

Windows

github.com

■share

github.com

■抽象クラス

github.com

インターフェイス

github.com

GitHubで公開しているソースコードを見た感じでは、「抽象クラス」であるAbstractWatchService.javaがWatchService.javaを実装した形になっている。

だけど、具象クラスとしては、プラットフォーム毎に分かれてるっぽい...

う~む、

その中枢技術であるJava仮想マシンは各プラットフォーム環境間の違いを吸収しながら、Javaプログラムの適切な共通動作を実現する機能を備えているこのテクノロジはwrite once, run anywhere」と標榜されていた。

https://ja.wikipedia.org/wiki/Java

⇧ 各プラットフォーム環境間の違いを吸収できとらんということですかね...

Javaは、ファイルシステムが絡んでくると、破綻するというかいろいろと駄目っぽいですな、つまり、「OS(Operation System)」の違いによって「write once, run anywhere」の理想は永久に成し遂げられることは無いということなんかね...

話が脱線しましたが、WatchServiceのアーキテクチャとしては、

qiita.com

⇧ 上図のような構成になる模様。

事前に、監視対象を登録しておく必要があるっぽいので、

www.concretepage.com

⇧ 全体像としては、上図のようなイメージになるかと。上図だと、「Watchable(Path)」が監視対象としたいオブジェクトになるってことかと。

懸念としては、「WatchService」の処理が至る所に散逸しないかってことですかね...

「WatchService」で検索して該当箇所を探せば良いのかも知らんけど...

監視対象を事前に登録しておく必要があることから、一カ所で済むとは思うけど。

Oracleさんの公開している情報によると、

docs.oracle.com

⇧ 無限ループさせておく必要があるっぽいので、常駐スレッド化する感じになるっぽいですかね。

Javaにおけるプロセスの粒度が分からんのですが、「.class」ファイルは単独で実行できることから、「.class」ファイルの実行によってプロセスが生成されるのであると仮定すると、シングルスレッドの処理においては、常駐スレッド=常駐プロセスと見なせるのかもしれないので、「常駐スレッド」と「常駐プロセス」のどっちと見なして良いか曖昧ですかね...

後述しますが、残念ながら、Javaのプロセスが生成される仕組みは分かりませんでした...

ブラックボックス感が半端ない...

Oracleの「jstack」コマンドのドキュメントを確認すると、

docs.oracle.com

pid

The process ID for which the stack trace is printed. The process must be a Java process. To get a list of Java processes running on a machine, use the jps(1) command.

https://docs.oracle.com/javase/8/docs/technotes/tools/windows/jstack.html

⇧『To get a list of Java processes running on a machine, use the jps(1) command.』とあるので、「jps」コマンドでJavaのプロセスの一覧が取得できるとありますね。

docs.oracle.com

Description

The jps command lists the instrumented Java HotSpot VMs on the target system. The command is limited to reporting information on JVMs for which it has the access permissions.

https://docs.oracle.com/javase/8/docs/technotes/tools/windows/jps.html#CHDGHCGB

⇧ プロセスの話が出て来んのだけど...『lists the instrumented Java HotSpot VMs on the target system』の中にプロセスの情報があると考えれば良いんかね?

atmarkit.itmedia.co.jp

⇧ 上記サイト様によりますと、いま動作してるJavaのプロセスの一覧は、「jps」で取得できそうってことですかね。

GC(Garbage Collection)」に関するプロセスは、「jstat」か「jstatd」で取得する感じになるんだとか。

今時のJavaを利用したWebアプリケーションだと、

atmarkit.itmedia.co.jp

Tomcatのようなアプリケーションサーバーで稼働してることが多いと思うので、warで「.class」ファイルはまとめられてることが多いと思うけど、Javaのプロセスの仕組みが依然として曖昧だ...。

Message Queue とは?

Wikipediaさんによりますと、

In computer sciencemessage queues and mailboxes are software-engineering components typically used for inter-process communication (IPC), or for inter-thread communication within the same process. They use a queue for messaging – the passing of control or of content. Group communication systems provide similar kinds of functionality.

https://en.wikipedia.org/wiki/Message_queue

⇧ 一旦、queueに制御を保持しておけるので、システム間を疎結合にしておけますと。

イメージし易いのは、

aws.amazon.com

AWSのドキュメントの上図のような例でしょうか。

Javaの標準APIのWatchServiceとMessage Queueの仕組みの何が違うのか?

Javaの標準APIのWatchServiceもqueue使っとるやんけ、って思うし実際にqueueを使ってるわけなんですが、

  • Javaの標準APIのWatchService
  • Message Queue

の仕組みの違いって何なのか。

大きな違いは、

  • Javaの標準APIのWatchService
    →監視対象が必須である
  • Message Queue
    →監視対象が必須ではない

ってことでしょうか?

Oracleさんのコーディング例を見ていると、WatchServiceは自分で監視対象を決めることで、queueに追加されてる、というか監視対象ありき。監視対象が無ければ成立しない。

なので、WatchServiceは名前の通り「監視」の機能が主で、Message Queueとは異なるってことで、そもそもの用途が違うってことなんでしょうね。

queue自体は、

In computer science, a queue is a collection of entities that are maintained in a sequence and can be modified by the addition of entities at one end of the sequence and the removal of entities from the other end of the sequence. By convention, the end of the sequence at which elements are added is called the back, tail, or rear of the queue, and the end at which elements are removed is called the head or front of the queue, analogously to the words used when people line up to wait for goods or services.

https://en.wikipedia.org/wiki/Queue_(abstract_data_type)

⇧ データを格納して、格納した順に取り出せる、「FIFO(First In First Out)」を実現する仕組みであると。

であるからして、APIないしライブラリないしソフトウェアなどがqueueをどのように活用してるかで、queueを利用してるAPIないしライブラリないしソフトウェアの用途が変わってきますと。

だから、アーキテクチャ図とかでシステム全体の概要図があると、システムの仕組みが把握しやすいし、具体的な処理を実装する際にも利用できそうなAPIの選定など検討しやすいんだけどな。

後は、好みの問題かもしれんけど、やはり、何らかの処理が完了したタイミングでMessage Queueに対してメッセージを送るとしておけば、何か問題があった場合にどこで問題が起きてるかの原因調査の切り分けがし易くなるような気がする。

何だかんだ言っても一番の問題は、Javaの標準APIのWatchServiceがプラットフォームによって挙動が変わり得るってことですかね。

一般的に、開発作業がWindows、または、Macで、本番環境がLinuxってパターンが多いと思うのだけど、プラットフォームの違いで挙動が異なるのは致命的だと思うし、なお且つ、実装に依りけりってところが怖いんよね...

とは言え、Javaアプリケーション開発で名の知られたSpring Frameworkなどのような、フレームワークとか利用していない、Javaアプリケーションで尚且つ1つのディレクトリを監視できれば良いような要件であれば、Javaの標準APIのWatchServiceが適しているのかしらね。

あれこれ言ってはみたものの、要件次第ってことかね。何を実現したいかをハッキリさせておかないと手戻りが発生しやすくなって辛いところですかね。

Javaの標準APIのWatchServiceを使ってみる

とは言え、実際に使わざるを得ない機会が来ないとも限らないので、選り好みしてるわけにはいかないということで、利用してみることに。

docs.oracle.com

⇧ 公式の情報を参考にしますか。

ちなみに、

seraphy.hatenablog.com

注意点
  1. オーバーフローがありえるので、必ずファイルのパス情報が得られるとは想定できない。
  2. 標準ではサブディレクトリに対する監視を設定する方法が存在しない。ディレクトリごとに個々に監視する必要がある。

JavaSE7でファイルを監視する方法 - seraphyの日記

⇧ 上記サイト様にありますように、サブディレクトリを監視対象とするには、

のいずれかの対応が必要になる模様。

今回は、Oracleさんがサンプルを効果雄してくれているので「2. 再帰的に登録する」方法を試してみます。

注意する点としては、個々のサブディレクトリを監視対象にしたい場合、起点となるディレクトリまでのパスを指定する感じでしょうか。

それでは、Eclipseで「Java プロジェクト」を作成し、以下のようなファイルを作成。

ソースコードは以下のような感じ。まぁ、動作確認用のソースコード以外、Oracleさんのサンプルそのまんまなんですけどね...

■/test-watchservice/src/apps/service/impl/service_watcher/BaseServiceWatcherRecursive.java

package apps.service.impl.service_watcher;

import static java.nio.file.LinkOption.*;
import static java.nio.file.StandardWatchEventKinds.*;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;

public class BaseServiceWatcherRecursive {
    private final WatchService watcher;
    private final Map<WatchKey,Path> keys;
    private final boolean recursive;
    private boolean trace = false;

    @SuppressWarnings("unchecked")
    static <T> WatchEvent<T> cast(WatchEvent<?> event) {
        return (WatchEvent<T>)event;
    }

    /**
     * Register the given directory with the WatchService
     */
    private void register(Path dir) throws IOException {
        WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
        if (trace) {
            Path prev = keys.get(key);
            if (prev == null) {
                System.out.format("register: %s\n", dir);
            } else {
                if (!dir.equals(prev)) {
                    System.out.format("update: %s -> %s\n", prev, dir);
                }
            }
        }
        keys.put(key, dir);
    }

    /**
     * Register the given directory, and all its sub-directories, with the
     * WatchService.
     */
    private void registerAll(final Path start) throws IOException {
        // register directory and sub-directories
        Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
                throws IOException
            {
                register(dir);
                return FileVisitResult.CONTINUE;
            }
        });
    }

    /**
     * Creates a WatchService and registers the given directory
     */
    BaseServiceWatcherRecursive(Path dir, boolean recursive) throws IOException {
        this.watcher = FileSystems.getDefault().newWatchService();
        this.keys = new HashMap<WatchKey,Path>();
        this.recursive = recursive;

        if (recursive) {
            System.out.format("Scanning %s ...\n", dir);
            registerAll(dir);
            System.out.println("Done.");
        } else {
            register(dir);
        }

        // enable trace after initial registration
        this.trace = true;
    }

    /**
     * Process all events for keys queued to the watcher
     */
    public void processEvents() {
        for (;;) {

            // wait for key to be signalled
            WatchKey key;
            try {
                key = watcher.take();
            } catch (InterruptedException x) {
                return;
            }

            Path dir = keys.get(key);
            if (dir == null) {
                System.err.println("WatchKey not recognized!!");
                continue;
            }

            for (WatchEvent<?> event: key.pollEvents()) {
                WatchEvent.Kind kind = event.kind();

                // TBD - provide example of how OVERFLOW event is handled
                if (kind == OVERFLOW) {
                    continue;
                }

                // Context for directory entry event is the file name of entry
                WatchEvent<Path> ev = cast(event);
                Path name = ev.context();
                Path child = dir.resolve(name);

                // print out event
                System.out.format("%s: %s\n", event.kind().name(), child);

                // if directory is created, and watching recursively, then
                // register it and its sub-directories
                if (recursive && (kind == ENTRY_CREATE)) {
                    try {
                        if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
                            registerAll(child);
                        }
                    } catch (IOException x) {
                        // ignore to keep sample readbale
                    }
                }
            }

            // reset key and remove from set if directory no longer accessible
            boolean valid = key.reset();
            if (!valid) {
                keys.remove(key);

                // all directories are inaccessible
                if (keys.isEmpty()) {
                    break;
                }
            }
        }
    }

    public static void usage() {
        System.err.println("usage: java WatchDir [-r] dir");
        System.exit(-1);
    }
}

■/test-watchservice/src/apps/service/impl/service_watcher/ServiceWatcherFromEmergencyImpl.java

package apps.service.impl.service_watcher;

import java.io.IOException;
import java.nio.file.Path;

public class ServiceWatcherFromEmergencyImpl extends BaseServiceWatcherRecursive {

	public ServiceWatcherFromEmergencyImpl(Path dir, boolean recursive) throws IOException {
		super(dir, recursive);

	}

}
    

■/test-watchservice/src/apps/service/impl/service_watcher/ServiceWatcherFromNormalImpl.java

package apps.service.impl.service_watcher;

import java.io.IOException;
import java.nio.file.Path;

public class ServiceWatcherFromNormalImpl extends BaseServiceWatcherRecursive {

	public ServiceWatcherFromNormalImpl(Path dir, boolean recursive) throws IOException {
		super(dir, recursive);
	}

}
    

■/test-watchservice/src/apps/EndpointAppWatchServiceEmergency.java

package apps;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;

import apps.service.impl.service_watcher.BaseServiceWatcherRecursive;
import apps.service.impl.service_watcher.ServiceWatcherFromEmergencyImpl;

public class EndpointAppWatchServiceEmergency {

	public static void main(String[] args) {
		EndpointAppWatchServiceEmergency endpointAppWatchServiceEmergency = new EndpointAppWatchServiceEmergency();
		endpointAppWatchServiceEmergency.startWatchServiceEmergency(args);
	}
	
	public void startWatchServiceEmergency(String[] args) {
        // parse arguments
        if (args.length == 0) {
        	BaseServiceWatcherRecursive.usage();
        }
        
        boolean recursive = false;
        if (args[0].equals("-r")) {
            recursive = true;

        }
        if (recursive) {
            Path dirEmergency = Paths.get(args[1]);
    		try {
    			
    			ServiceWatcherFromEmergencyImpl serviceWatcherFromEmergencyImpl = new ServiceWatcherFromEmergencyImpl(dirEmergency, recursive);
    			// 監視プロセス開始
    			serviceWatcherFromEmergencyImpl.processEvents();

    		} catch (IOException e) {
    			// TODO 自動生成された catch ブロック
    			e.printStackTrace();
    		}
        }
	}

}
    

■/test-watchservice/src/apps/EndpointAppWatchServiceNormal.java

package apps;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;

import apps.service.impl.service_watcher.BaseServiceWatcherRecursive;
import apps.service.impl.service_watcher.ServiceWatcherFromNormalImpl;

public class EndpointAppWatchServiceNormal {

	public static void main(String[] args) {
		EndpointAppWatchServiceNormal endpointAppWatchServiceNormal = new EndpointAppWatchServiceNormal();
		endpointAppWatchServiceNormal.startWatchServiceNormal(args);
	}
	
	public void startWatchServiceNormal (String[] args) {
        // parse arguments
        if (args.length == 0) {
        	BaseServiceWatcherRecursive.usage();
        }
        
        boolean recursive = false;
        if (args[0].equals("-r")) {
            recursive = true;

        }
        if (recursive) {
            Path dirNormal = Paths.get(args[1]);

    		try {
    			ServiceWatcherFromNormalImpl serviceWatcherFromNormalImpl = new ServiceWatcherFromNormalImpl(dirNormal, recursive);
    			// 監視プロセス開始
    			serviceWatcherFromNormalImpl.processEvents();
    			
    		} catch (IOException e) {
    			// TODO 自動生成された catch ブロック
    			e.printStackTrace();
    		}
        }
	}

}
    

動作確認用のコード。@Testアノテーションを付けると、Eclipseに同梱されてるJUnitをビルド・パスに追加するか聞かれるので、追加します。自分は諸々の事情によりレガシーな「JUnit 4」を読み込んでます。

■/test-watchservice/test/apps/ExeEmergency.java

package apps;

public class ExeEmergency implements Runnable {
	static String[] argsEmergency = {
		    "-r"
			,"C:\\Users\\Toshinobu\\Desktop\\soft_work\\java_work\\watchService\\emergency"
	};
	@Override
	public void run() {
		new EndpointAppWatchServiceEmergency().startWatchServiceEmergency(argsEmergency);
		System.out.println("Done EndpointAppWatchServiceEmergency");

	}

}    

■/test-watchservice/test/apps/ExeNormal.java

package apps;

public class ExeNormal  implements Runnable {
	static String[] argsNormal = {
		    "-r"
			,"C:\\Users\\Toshinobu\\Desktop\\soft_work\\java_work\\watchService\\normal"
	};

	@Override
	public void run() {
		new EndpointAppWatchServiceNormal().startWatchServiceNormal(argsNormal);
		System.out.println("Done EndpointAppWatchServiceNormal");
	}
	
}
   

■/test-watchservice/test/apps/TestEndPointApp.java

package apps;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.Test;


public class TestEndPointApp {
//	static String[] argsNormal = {
//		    "-r"
//			,"C:\\Users\\Toshinobu\\Desktop\\soft_work\\java_work\\watchService\\normal"
//	};
//	
//	static String[] argsEmergency = {
//		    "-r"
//			,"C:\\Users\\Toshinobu\\Desktop\\soft_work\\java_work\\watchService\\emergencyImpl\\data\\csv"
//	};

	@Test
	public void test() {

		ExecutorService threadPool = Executors.newFixedThreadPool(2);
//		TestEndPointApp.InnerExeNormal innerNormal = new TestEndPointApp(). new InnerExeNormal();		
//		TestEndPointApp.InnerExeEmergency InnerExeEmergency = new TestEndPointApp().new InnerExeEmergency();

        // それぞれのスレッド用のCountDownLatchを初期化
//        CountDownLatch latch1 = new CountDownLatch(1);
//        CountDownLatch latch2 = new CountDownLatch(1);
		try {
//			threadPool.submit(innerNormal);
//			threadPool.submit(InnerExeEmergency);
//			FutureTask<String> ftNormal = new FutureTask<String>(innerNormal);
//			new Thread(ftNormal).run();
//			FutureTask<String> ftEmmergency = new FutureTask<String>(InnerExeEmergency);
//			new Thread(ftEmmergency).start();
//			new Thread(new ExeNormal()).run();
//			new Thread(new ExeEmergency()).run();
			//threadPool.execute(new ExeNormal());
//			threadPool.execute(new ExeEmergency());
			

			threadPool.submit(() -> {
                System.out.println("Thread 1 is running");
                // ここに1つ目のスレッドの処理を記述
                new Thread(new ExeNormal()).run();
                // スレッド1の処理が終了しないようにスリープを追加しない
            });

			threadPool.submit(() -> {
                System.out.println("Thread 2 is running");
                // ここに2つ目のスレッドの処理を記述
                new Thread(new ExeEmergency()).run();
                // スレッド2の処理が終了しないようにスリープを追加しない
            });

            // テストが終了しないようにメインスレッドをスリープさせる(Ctrl+C で中断)
            try {
                Thread.sleep(Long.MAX_VALUE);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }		
			
		} catch (Exception e) {
			// TODO 自動生成された catch ブロック
			e.printStackTrace();
		} finally {
			//threadPool.shutdown();
		}
	}
	
//	public class InnerExeNormal implements Callable<String> {
//
//		@Override
//		public String call() throws Exception {
//			new EndpointAppWatchServiceNormal().startWatchServiceNormal(argsNormal);
//			System.out.println("Done EndpointAppWatchServiceNormal");
//			return null;
//		}
//		
//	}
//	
//	public class InnerExeEmergency implements Callable<String> {
//
//		@Override
//		public String call() throws Exception {
//			new EndpointAppWatchServiceEmergency().startWatchServiceEmergency(argsEmergency);
//			System.out.println("Done EndpointAppWatchServiceEmergency");
//			return null;
//		}
//		
//	}
	
	
}
    

⇧ で、ファイルを保存して、監視対象のディレクトリを作成しておきます。

自分の場合は、監視対象のディレクトリとして、以下のようなディレクトリを作成してます。

C:\Users\Toshinobu\Desktop\soft_work\java_work\watchService>tree /f
フォルダー パスの一覧
...
C:.
│
├─emergency
│  └─data
│      └─csv
└─normal    

で、諸々の準備が整ったら、「実行(R)」>「JUnit テスト」を選択。

で、監視対象のディレクトリに適当なファイルを配置すると検知してくれます。

あとは、検知後に実施したい処理を追加していく感じになるとは思いますが、とりあえず、監視して検知はできたということで。

検知後にMessage Queueに対してメッセージを送信して、Message Queueからメッセージを受け取ってから必要な処理をする感じの構成にすれば、疎結合な感じにできるということかね。

Java標準のAPIのWatchServiceについて動作が確認できたので、良しとしておきますか。

毎回思うのは、動作確認の良い方法が思いつかんのよね...

今回のような場合、コマンドプロンプトでファイルを監視対象のディレクトリにコピーして、コンソールのログを残せば動作確認のエビデンスになるんかな?

ちなみに、Junitでの実行なので、普通の実行と変わってくるのかも知らんけど、Javaのプロセスとして認識されているのは以下で、

試しに、Junitを起動した時に生成されたであろうと思われるプロセスに対して、jpsコマンドを実施してみたところ、

C:\Eclipse-2023-06\java\8\bin\jstack.exe 23388 > C:\Users\Toshinobu\Desktop\soft_work\java_work\watchService\thread-list.txt    
2023-09-09 20:40:12
Full thread dump OpenJDK 64-Bit Server VM (25.372-b07 mixed mode):

"Thread-3" #17 daemon prio=5 os_prio=0 tid=0x000001e71c66e800 nid=0x5804 runnable [0x000000d5be9fe000]
   java.lang.Thread.State: RUNNABLE
	at sun.nio.fs.WindowsNativeDispatcher.GetQueuedCompletionStatus0(Native Method)
	at sun.nio.fs.WindowsNativeDispatcher.GetQueuedCompletionStatus(WindowsNativeDispatcher.java:1007)
	at sun.nio.fs.WindowsWatchService$Poller.run(WindowsWatchService.java:586)
	at java.lang.Thread.run(Thread.java:750)

"Thread-2" #16 daemon prio=5 os_prio=0 tid=0x000001e71c66e000 nid=0x284c runnable [0x000000d5be8ff000]
   java.lang.Thread.State: RUNNABLE
	at sun.nio.fs.WindowsNativeDispatcher.GetQueuedCompletionStatus0(Native Method)
	at sun.nio.fs.WindowsNativeDispatcher.GetQueuedCompletionStatus(WindowsNativeDispatcher.java:1007)
	at sun.nio.fs.WindowsWatchService$Poller.run(WindowsWatchService.java:586)
	at java.lang.Thread.run(Thread.java:750)

"pool-1-thread-2" #13 prio=5 os_prio=0 tid=0x000001e71c669800 nid=0x4e50 waiting on condition [0x000000d5be7fe000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000000d66ad098> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
	at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
	at sun.nio.fs.AbstractWatchService.take(AbstractWatchService.java:118)
	at apps.service.impl.service_watcher.BaseServiceWatcherRecursive.processEvents(BaseServiceWatcherRecursive.java:94)
	at apps.EndpointAppWatchServiceEmergency.startWatchServiceEmergency(EndpointAppWatchServiceEmergency.java:34)
	at apps.ExeEmergency.run(ExeEmergency.java:10)
	at java.lang.Thread.run(Thread.java:750)
	at apps.TestEndPointApp.lambda$1(TestEndPointApp.java:53)
	at apps.TestEndPointApp$$Lambda$2/1940030785.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

"pool-1-thread-1" #12 prio=5 os_prio=0 tid=0x000001e71c669000 nid=0x5444 waiting on condition [0x000000d5be6ff000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000000d6601df8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
	at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
	at sun.nio.fs.AbstractWatchService.take(AbstractWatchService.java:118)
	at apps.service.impl.service_watcher.BaseServiceWatcherRecursive.processEvents(BaseServiceWatcherRecursive.java:94)
	at apps.EndpointAppWatchServiceNormal.startWatchServiceNormal(EndpointAppWatchServiceNormal.java:34)
	at apps.ExeNormal.run(ExeNormal.java:11)
	at java.lang.Thread.run(Thread.java:750)
	at apps.TestEndPointApp.lambda$0(TestEndPointApp.java:46)
	at apps.TestEndPointApp$$Lambda$1/1910163204.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

"ReaderThread" #11 prio=5 os_prio=0 tid=0x000001e71c4db000 nid=0x47e0 runnable [0x000000d5be5fe000]
   java.lang.Thread.State: RUNNABLE
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	- locked <0x00000000d602ee68> (a java.io.InputStreamReader)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	- locked <0x00000000d602ee68> (a java.io.InputStreamReader)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner$ReaderThread.run(RemoteTestRunner.java:152)

"Service Thread" #10 daemon prio=9 os_prio=0 tid=0x000001e71c1be000 nid=0x3db8 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #9 daemon prio=9 os_prio=2 tid=0x000001e71a40c000 nid=0x2ae4 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #8 daemon prio=9 os_prio=2 tid=0x000001e71a401000 nid=0x5acc waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #7 daemon prio=9 os_prio=2 tid=0x000001e71a3fa000 nid=0x25d0 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #6 daemon prio=9 os_prio=2 tid=0x000001e71a3fd800 nid=0x459c waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x000001e71a3f5000 nid=0x830 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x000001e71a3f4000 nid=0x3580 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x000001e71a37b800 nid=0x2cb0 in Object.wait() [0x000000d5bdcfe000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x00000000d5b88f00> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
	- locked <0x00000000d5b88f00> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
	at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:188)

"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x000001e71a374800 nid=0x3b00 in Object.wait() [0x000000d5bdbfe000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x00000000d5b86b98> (a java.lang.ref.Reference$Lock)
	at java.lang.Object.wait(Object.java:502)
	at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
	- locked <0x00000000d5b86b98> (a java.lang.ref.Reference$Lock)
	at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x000001e704222800 nid=0x5a40 waiting on condition [0x000000d5bd1fe000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at apps.TestEndPointApp.test(TestEndPointApp.java:59)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:93)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:756)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)

"VM Thread" os_prio=2 tid=0x000001e71a34a800 nid=0x5be8 runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x000001e704231800 nid=0x4660 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x000001e704234000 nid=0x3720 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x000001e704235800 nid=0x4738 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x000001e704236800 nid=0x158c runnable 

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x000001e704238800 nid=0x2828 runnable 

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x000001e704239800 nid=0x5a58 runnable 

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x000001e70423c800 nid=0x3df8 runnable 

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x000001e70423d800 nid=0x10ec runnable 

"VM Periodic Task Thread" os_prio=2 tid=0x000001e71c216800 nid=0x17c8 waiting on condition 

JNI global references: 317

⇧ という感じで、マルチスレッドの処理だからなのか分からんけども、おそらく、java.exeで実行した対象のみがプロセスとして認識される感じになるんかね。

となると、個別に「.class」をjava.exeで実行してプロセスを取得するしかないんかな...

でも、jpsコマンドのプロセスを除いたとして、3つのプロセスが生成されてるっぽいのだけど、どうやって生成されてるんだろうか?

う~む、Javaにおけるプロセスよく分からん...

WatchServiceのAPIを利用して処理を実行してるmainメソッドのあるクラスをコマンドで実行したところ、それぞれのプロセスが表示されました。

実行する「.class」については、JUnitで実行してるから、「[Javaプロジェクト名]/bin」フォルダに配置されてるのだけど、Eclipse上だと「プロジェクト・エクスプローラー」じゃないと確認できない。(「パッケージ・エクスプローラー」で「[Javaプロジェクト名]/bin」フォルダが表示されないのはよく分からんです...)

コマンドプロンプトを2つ新規に起ち上げて、「[Javaプロジェクト名]/bin」フォルダの「.class」ファイルを、各々のコマンドプロンプトで実行。

実行に使う「java.exe」コマンドは、Junitの実行で使用されてるものを確認すると、

「コマンド行(M):」が、

C:\Eclipse-2023-06\java\8\bin\javaw.exe -ea -Dfile.encoding=UTF-8 -classpath C:\Eclipse-2023-06\workspace\test-watchservice\bin;C:\Eclipse-2023-06\eclipse\plugins\org.junit_4.13.2.v20211018-1956.jar;C:\Eclipse-2023-06\eclipse\plugins\org.hamcrest.core_1.3.0.v20180420-1519.jar;C:\Eclipse-2023-06\eclipse\configuration\org.eclipse.osgi\312\0\.cp;C:\Eclipse-2023-06\eclipse\configuration\org.eclipse.osgi\311\0\.cp org.eclipse.jdt.internal.junit.runner.RemoteTestRunner -version 3 -port 54340 -testLoaderClass org.eclipse.jdt.internal.junit4.runner.JUnit4TestLoader -loaderpluginname org.eclipse.jdt.junit4.runtime -classNames apps.TestEndPointApp    

となっているので、「C:\Eclipse-2023-06\java\8\bin\java.exe」を利用する感じになるかと。ご自身の環境に合わせてください。

それぞれの「.class」ファイルを、実行。

で、もう1つ新規にコマンドプロンプトを起動し、Javaのプロセスを確認。

⇧ mainメソッドが走るとJavaのプロセスが生成されるってことなんかな?

だとすると、Javaにサブプロセスって概念が無いように思えてくるんだが...

Javaのプロセスを停止するコマンドがJava側で用意されていないようなので、Windowsだと「taskkill」コマンドする感じになるらしい。

話をWatchServiceに戻すと、そもそも、

Windows環境のEclipseに同梱されているJavaだからなのか、「WindowsWatchService」が利用されてるっぽいので、Linux環境で挙動が変わってきそうですな...

まぁ、

www.oracle.com

⇧ そもそも、プラットフォーム毎に利用する「JDKJava Development Kit)」が異なるし、

Java Platform, Standard Edition, ou Java SE (anciennement Java 2 Platform, Standard Edition, ou J2SE), est une spécification de la plateforme Java d'Oracle, destinée typiquement aux applications pour poste de travail.

https://fr.wikipedia.org/wiki/Java_SE

Wikipediaの図が混乱を招くのだけど、

Oracleさんの公式のドキュメントによりますと、

www.oracle.com

⇧「JREJava Runtime Enviroment)」は「JDKJava Development Kit)」に内包されてることから、「JREJava Runtime Enviroment)」もまた、プラットフォーム毎に異なってきそう、というか、参照してるJavaの標準APIが、Eclipseに同梱されてるJavaの「JREJava Runtime Enviroment)」のものになっているので影響受けてるってことですかね。

とりあえず、Oracleさん、Javaのプロセスが生成される仕組みについてアーキテクチャ図を公開して欲しいかな...

Javaにおいてsub processは、

docs.oracle.com

ProcessBuilder.start()メソッドやRuntime.execメソッドはネイティブのプロセスを作成し、Processのサブクラスのインスタンスを返しますが、これを使えば、そのプロセスを制御したり情報を取得したりできます。Processクラスは、プロセスからの入力、プロセスへの出力、プロセス完了の待機、プロセス終了状態の確認、およびプロセスの破棄(終了)を実行するための各メソッドを提供します。

https://docs.oracle.com/javase/jp/8/docs/api/java/lang/Process.html

たとえば、ネイティブなウィンドウ処理プロセス、デーモン・プロセス、Microsoft Windows環境でのWin16/DOSプロセス、あるいはシェル・スクリプトといったプロセスです。

https://docs.oracle.com/javase/jp/8/docs/api/java/lang/Process.html

デフォルトでは、作成されたサブプロセスは、自身の端末またはコンソールを持ちません。その標準入出力(つまり標準入力、標準出力、標準エラー)の処理はすべて親プロセスにリダイレクトされますが、それらの情報にアクセスするには、メソッドgetOutputStream()getInputStream()、およびgetErrorStream()を使って取得されるストリームを使用します。親プロセスはこれらのストリームを使って、サブプロセスに入力を送ったり、サブプロセスからの出力を取得したりします。ネイティブなプラットフォームには標準入出力ストリームに使うバッファのサイズが限られるものもあるので、サブプロセスの入力ストリームの書き込みあるいはストリーム出力の読込みが失敗した場合、サブプロセスはブロックされるか、デッドロック状態になる可能性があります。

https://docs.oracle.com/javase/jp/8/docs/api/java/lang/Process.html

⇧ 無理やり作る感じでないと生成されないってことなんだろうか?

というか、ドキュメントで言っている「サブプロセス」ってJavaのプロセスと関係ない、一般的なプロセスにおけるサブプロセスのことを言ってたりしてないか?

何て言うか、Linuxのpstreeコマンドで得られる情報みたいに、Javaのプロセスが階層構造になることが有り得るのかが知りたいだけなんだけどな...

結局、よく分からんではないか...

毎度モヤモヤ感が半端ない...

今回はこのへんで。