Mencoba menyimpan acara memiliki aliran ini ( repo reaktif, ini hanya contoh kode untuk pengujian. Saya baru reaktif, saya menggunakan io.projectreactor (3.3))

  1. Memvalidasi suatu peristiwa, pada kegagalan, menulis ke sejarah
  2. jika validasi berhasil, tulis acara ke repo, setiap kegagalan tulis ke riwayat
  3. jika validasi gagal, tulis ke riwayat
  4. menginduksi beberapa kegagalan untuk mensimulasikan kondisi kesalahan
import reactor.core.publisher.Mono;

public class MyTest {

    static int counter = 0;


    public static void main(String args[]) throws InterruptedException
    {
        String array[] = {"1","2","3","4",null,"5"};
        for(int i =0; i < 5; i++)
        {
            System.out.println("input:: "+array[i]);
            new MyTest().createMessage(array[i]);
            counter++;
            Thread.sleep(500);
        }
    }
    private void createMessage(String input)
    {
        new MyTest().onMessage(input)
                .doOnSuccess(s -> System.out.println("----done::success-----"))
                .onErrorResume(e ->
                {System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
                return Mono.empty();})
                .subscribe();
    }

    private Mono<String> onMessage(String input)
    {
        return Mono.create(sink -> {
            validate()
                    .onErrorResume(e -> {
                        System.out.println("error onMessage:: fail to validate");
                        sink.error(e);
                        return Mono.error(e);
                    })
                    .flatMap(a -> processObject(input))
                    .flatMap(h -> {
                        System.out.println("success onMessage :: save history");
                        new Service().saveHistory(input, false);
                        sink.success();
                        return Mono.just(h);
                      })
                     .subscribe();
        });

    }

    private Mono<String> processObject(String input)
    {
           return Mono.create(sink -> {
               new Service().saveEvent(input).flatMap(a -> {
                   System.out.println("success processObject");
                   sink.success(a);
                   return Mono.just(a);
               }).onErrorResume(e -> {
                   new Service().saveHistory(input, true);
                   System.out.println("error processObject");
                   sink.error(e);
                   return Mono.error(e);
               }).subscribe();
           });

    }

    private Mono<String> validate()
    {
        counter++;
        return Mono.create(sink -> {
            if (counter % 3 == 0)
            {
                sink.error(new RuntimeException("Validate method error"));
                return;
            }
            sink.success("validate is done ");
            return;
        });

    }


}

Kelas Layanan

public class Service {


    public Mono<String> saveEvent(String id)
    {
        return save(id)
                .onErrorResume(e -> {
                    System.out.println("Error in save event");
                    return Mono.error(e);
                }).doOnNext(e -> System.out.println("save event"));

    }

    public Mono<String> saveHistory(String id, boolean error)
    {
        return save(id)
                .onErrorResume(e -> {
                    System.out.println("Error in save history");
                    return Mono.error(e);
                }).doOnNext(e -> System.out.println("save history"));

    }

    public Mono<String> save(String id)
    {

        if (id  == null)
        {
            throw new RuntimeException("Error saving");
        }

        return Mono.just("save success");
    }

}

Saya mendapatkan pengecualian ini

---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Validate method error
Caused by: java.lang.RuntimeException: Validate method error
    at sample.MyTest.lambda$validate$9(MyTest.java:77)
    at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
    at sample.MyTest.lambda$onMessage$5(MyTest.java:49)
    at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
    at sample.MyTest.createMessage(MyTest.java:30)
    at sample.MyTest.main(MyTest.java:18)

Kode kerja yang diperbarui: berdasarkan komentar @Michael Berry

 public static void main(String args[]) throws InterruptedException
    {
        String array[] = {"1","2","3","4",null,"5"};
        for(int i =0; i < 5; i++)
        {
            System.out.println("input:: "+array[i]);
            new MyTest().createMessage(array[i]);
            counter++;
            Thread.sleep(500);
        }
    }
    private void createMessage(String input)
    {
        new MyTest().onMessage(input)
                .doOnSuccess(s -> System.out.println("----done::success-----"))
                .onErrorResume(e ->
                {
                    System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
                   return Mono.empty();
                })
                .subscribe();
    }

    private Mono<String> onMessage(String input) {
        return validate()
                .onErrorResume(e -> {
                    System.out.println("error onMessage:: fail to validate");
                    return Mono.error(e);
                })
                .flatMap(a -> processObject(input))
                .flatMap(h -> {
                    System.out.println("success onMessage :: save history");
                    new Service().saveHistory(input, false);
                    return Mono.just(h);
                });
    }

    private Mono<String> processObject(String input)
    {
           return new Service().saveEvent(input).flatMap(a -> {
                   System.out.println("success processObject");
                   return Mono.just(a);
               }).onErrorResume(e -> {
                   new Service().saveHistory(input, true);
                   System.out.println("error processObject");
                    return Mono.error(e);
               });

    }

    private Mono<String> validate()
    {
        counter++;

            if (counter % 3 == 0)
            {
                return Mono.error(new RuntimeException("Validate method error"));

            }
          return  Mono.just("validate is done ");



    }

Hasil

save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
1
Prasanna Talakanti 1 Juli 2020, 01:52

1 menjawab

Jawaban Terbaik

Anda mendapatkan kesalahan di sini karena penerapan onMessage() Anda, yang agak aneh:

  • Anda membungkus Mono di Mono.create(), yang tidak ada alasan untuk melakukannya;
  • Anda sendiri berlangganan penerbit dalam ini - itu hampir selalu merupakan hal yang salah untuk dilakukan, dan belum tentu melakukan apa yang Anda harapkan (berlangganan ke penerbit harus ditangani oleh kerangka kerja, bukan kode Anda.) Dalam hal ini, kuncinya masalahnya itu berarti diperlakukan secara terpisah, bukan bagian dari rantai reaktif Anda, jadi penanganan kesalahan Anda mungkin tidak memetakan ke penerbit dalam seperti yang Anda harapkan;
  • Panggilan onErrorResume() Anda pada penerbit dalam ini sendiri mengembalikan kesalahan, dan tidak ada penanganan kesalahan lain pada penerbit dalam ini - oleh karena itu mengapa kesalahan itu tidak tertangani, sehingga ia kemudian mencetak jejak tumpukan yang Anda lihat.

Sebagai gantinya, kemungkinan besar Anda ingin metode onMessage() Anda membaca sebagai berikut:

private Mono<String> onMessage(String input) {
    return validate()
            .onErrorResume(e -> {
                System.out.println("error onMessage:: fail to validate");
                return Mono.error(e);
            })
            .flatMap(a -> processObject(input))
            .flatMap(h -> {
                System.out.println("success onMessage :: save history");
                new Service().saveHistory(input, false);
                return Mono.just(h);
            });
}

...tanpa Mono.create() (yang hanya benar-benar dimaksudkan untuk digunakan oleh API panggilan balik non-reaktor untuk tujuan kompatibilitas.) Output Anda dengan perubahan ini kemudian berbunyi sebagai berikut:

input:: 1
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate     method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
1
Michael Berry 1 Juli 2020, 06:52