Saya mencoba memvalidasi JSONObject dengan seperangkat aturan jika json cocok dengan seperangkat aturan apakah itu akan mengembalikan aturan yang cocok dan JSONObject jika tidak itu akan mengembalikan JSONObject ke Sideoutput semua ini diproses dalam ProcessFuntion, saya mendapatkan yang utama output tetapi tidak dapat menangkap output samping

SideOutput Stream didefinisikan seperti di bawah ini

public final static OutputTag<org.json.JSONObject> unMatchedJSONSideOutput = new OutputTag<org.json.JSONObject>(
            "unmatched-side-output") {};

ProcessFunction didefinisikan seperti di bawah ini

public class RuleFilter extends ProcessFunction<Tuple2<String,org.json.JSONObject>,Tuple2<String,org.json.JSONObject>> {
@Override
    public void processElement(Tuple2<String, org.json.JSONObject> value,
            ProcessFunction<Tuple2<String, org.json.JSONObject>, Tuple2<String, org.json.JSONObject>>.Context ctx,
            Collector<Tuple2<String, org.json.JSONObject>> out) throws Exception {

        if(this.value.matches((value.f1))) {
        out.collect(new Tuple2<String, org.json.JSONObject>(value.f0,value.f1));
        }else {
            ctx.output(RuleMatching.unMatchedJSONSideOutput,value.f1);
        }
    }
}

Saya mencetak output Datastream utama seperti di bawah ini

    DataStream<Tuple2<String, org.json.JSONObject>> matchedJSON =
                            inputSignal.map(new MapFunction<org.json.JSONObject, Tuple2<String, org.json.JSONObject>>() {
                                @Override
                                public Tuple2<String, org.json.JSONObject> map(org.json.JSONObject input) throws Exception {
                                    return new Tuple2<>(value, input);
                                }
                            }).process(new RuleFilter()).print("MatchedJSON=>");

matchedJSON .print("matchedJSON=>");

Saya mencetak Sideoutput seperti di bawah ini

DataStream<org.json.JSONObject> unmatchedJSON =
                        ((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(new MapFunction<Tuple2<String, org.json.JSONObject>, org.json.JSONObject>() {
                            @Override
                            public org.json.JSONObject map(Tuple2<String, org.json.JSONObject> value) throws Exception {
                                return value.f1;
                            }
                        })).getSideOutput(unMatchedJSONSideOutput );

                unmatchedJSON.print("unmatchedJSON=>");

Aliran Utama mencetak output tetapi output samping tidak mencetak untuk json yang tidak valid, tolong bantu dalam memecahkan masalah

2
YRK 25 Mei 2020, 18:14

1 menjawab

Jawaban Terbaik

Masalahnya ada di sini:

DataStream<org.json.JSONObject> unmatchedJSON =
    ((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(...))
    .getSideOutput(unMatchedJSONSideOutput);

Anda harus memanggil getSideOutput langsung di matchedJSON, bukan pada hasil penerapan MapFunction padanya. Hanya ProcessFunction yang dapat memiliki output sampingan, dan harus datang langsung dari ProcessFunction. Anda menipu kompiler untuk menerima ini dengan mentransmisikan aliran keluaran dari peta, tetapi runtime tidak dapat melakukan sesuatu yang berarti dengan ini.

2
David Anderson 26 Mei 2020, 15:53