Saya mencoba membuat aplikasi sederhana di mana aplikasi akan menggunakan pesan Kafka, lakukan beberapa transformasi cql dan publikasikan ke Kafka dan di bawah ini adalah kodenya:

JAVA: 1,8 Flink: 1,13 Scala: 2,11 flink-siddhi: 2.11-0.2.2-SNAPSHOT

Saya menggunakan perpustakaan: https://github.com/haoch/flink-siddhi

masukkan json ke Kafka:

{
   "awsS3":{
      "ResourceType":"aws.S3",
      "Details":{
         "Name":"crossplane-test",
         "CreationDate":"2020-08-17T11:28:05+00:00"
      },
      "AccessBlock":{
         "PublicAccessBlockConfiguration":{
            "BlockPublicAcls":true,
            "IgnorePublicAcls":true,
            "BlockPublicPolicy":true,
            "RestrictPublicBuckets":true
         }
      },
      "Location":{
         "LocationConstraint":"us-west-2"
      }
   }
}

kelas utama:

public class S3SidhiApp {
    public static void main(String[] args) {
        internalStreamSiddhiApp.start();
        //kafkaStreamApp.start();
    }
}

Kelas aplikasi:

package flinksidhi.app;

import com.google.gson.JsonObject;
import flinksidhi.event.s3.source.S3EventSource;

import io.siddhi.core.SiddhiManager;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import org.json.JSONObject;


import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import static flinksidhi.app.connector.Consumers.createInputMessageConsumer;
import static flinksidhi.app.connector.Producer.*;

public class internalStreamSiddhiApp {

    private static final String inputTopic = "EVENT_STREAM_INPUT";
    private static final String outputTopic = "EVENT_STREAM_OUTPUT";
    private static final String consumerGroup = "EVENT_STREAM1";
    private static final String kafkaAddress = "localhost:9092";
    private static final String zkAddress = "localhost:2181";

    private static final String S3_CQL1 = "from inputStream select * insert into temp";
    private static final String S3_CQL = "from inputStream select json:toObject(awsS3) as obj insert into temp;" +
            "from temp select json:getString(obj,'$.awsS3.ResourceType') as affected_resource_type," +
            "json:getString(obj,'$.awsS3.Details.Name') as affected_resource_name," +
            "json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration') as encryption," +
            "json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm') as algorithm insert into temp2; " +
            "from temp2 select  affected_resource_name,affected_resource_type, " +
            "ifThenElse(encryption == ' ','Fail','Pass') as state," +
            "ifThenElse(encryption != ' ' and algorithm == 'aws:kms','None','Critical') as severity insert into outputStream";


    public static void start(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //DataStream<String> inputS = env.addSource(new S3EventSource());

        //Flink kafka stream consumer
        FlinkKafkaConsumer<String> flinkKafkaConsumer =
                createInputMessageConsumer(inputTopic, kafkaAddress,zkAddress, consumerGroup);

        //Add Data stream source -- flink consumer
        DataStream<String> inputS = env.addSource(flinkKafkaConsumer);
        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

        cep.registerExtension("json:toObject", io.siddhi.extension.execution.json.function.ToJSONObjectFunctionExtension.class);
        cep.registerExtension( "json:getString", io.siddhi.extension.execution.json.function.GetStringJSONFunctionExtension.class);
        cep.registerStream("inputStream", inputS, "awsS3");


        inputS.print();

        System.out.println(cep.getDataStreamSchemas());
        //json needs extension jars to present during runtime.
        DataStream<Map<String,Object>> output = cep
                .from("inputStream")
                .cql(S3_CQL1)
                .returnAsMap("temp");


        //Flink kafka stream Producer
        FlinkKafkaProducer<Map<String, Object>> flinkKafkaProducer =
                createMapProducer(env,outputTopic, kafkaAddress);

        //Add Data stream sink -- flink producer
        output.addSink(flinkKafkaProducer);
        output.print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

Kelas konsumen:

package flinksidhi.app.connector;


import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.json.JSONObject;

import java.util.Properties;
public class Consumers {
    public static FlinkKafkaConsumer<String> createInputMessageConsumer(String topic, String kafkaAddress, String zookeeprAddr, String kafkaGroup ) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaAddress);
        properties.setProperty("zookeeper.connect", zookeeprAddr);
        properties.setProperty("group.id",kafkaGroup);
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(
                topic,new SimpleStringSchema(),properties);
        return consumer;
    }
}

Kelas produser:

package flinksidhi.app.connector;

import flinksidhi.app.util.ConvertJavaMapToJson;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.json.JSONObject;

import java.util.Map;

public class Producer {

    public static FlinkKafkaProducer<Tuple2> createStringProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<Tuple2>(kafkaAddress, topic, new AverageSerializer());
    }

    public static FlinkKafkaProducer<Map<String,Object>> createMapProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<Map<String,Object>>(kafkaAddress, topic, new SerializationSchema<Map<String, Object>>() {
            @Override
            public void open(InitializationContext context) throws Exception {

            }

            @Override
            public byte[] serialize(Map<String, Object> stringObjectMap) {
                String json = ConvertJavaMapToJson.convert(stringObjectMap);
                return json.getBytes();
            }
        });
    }
}

Saya telah mencoba banyak hal tetapi kode tempat CQL dipanggil tidak pernah dipanggil dan bahkan tidak memberikan kesalahan apa pun, tidak yakin di mana kesalahannya.

Hal yang sama jika saya membuat sumber aliran internal dan menggunakan input json yang sama untuk kembali sebagai string yang berfungsi.

0
Deepanjan Majumdar 23 Mei 2021, 07:31

2 jawaban

Jawaban Terbaik

Tebakan awal: jika Anda menggunakan waktu genap, apakah Anda yakin telah mendefinisikan tanda air dengan benar? Sebagaimana dinyatakan dalam dokumen :

(...) elemen yang masuk awalnya dimasukkan ke dalam buffer di mana elemen diurutkan dalam urutan menaik berdasarkan stempel waktu mereka, dan ketika tanda air tiba, semua elemen dalam buffer ini dengan stempel waktu yang lebih kecil dari tanda air diproses (. ..)

Jika ini tidak membantu, saya akan menyarankan untuk menguraikan/menyederhanakan pekerjaan seminimal mungkin, misalnya hanya operator sumber dan beberapa elemen pencetakan/logging wastafel yang naif. Dan jika berhasil, mulailah menambahkan kembali operator satu per satu. Anda juga bisa mulai dengan menyederhanakan pola CEP Anda sebanyak mungkin.

0
Piotr Nowojski 25 Mei 2021, 07:59

Pertama-tama terima kasih banyak @Piotr Nowojski , hanya karena pointer kecil Anda yang tidak peduli berapa kali saya merenungkan tentang waktu acara, itu tidak muncul dalam pikiran saya. Jadi ya saat men-debug dua kasus:

  1. Dengan sumber data internal , di mana ia berhasil diproses, saat men-debug aliran , saya mengidentifikasi bahwa itu sedang memproses tanda air setelah memproses data, tetapi tidak menangkap saya bahwa entah bagaimana mengelola waktu peristiwa data secara implisit.
  2. Dengan kafka sebagai sumber data , ketika saya sedang men-debug, saya dapat dengan jelas melihat bahwa itu tidak memproses tanda air apa pun dalam aliran, tetapi tidak terpikir oleh saya bahwa , itu terjadi karena waktu acara dan tanda air tidak ditangani dengan benar.

Hanya menambahkan satu baris kode dalam kode aplikasi yang saya pahami dari cuplikan kode Flink di bawah ini:

@deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link
     *     TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling
     *     event-time support anymore. Explicitly using processing-time windows and timers works in
     *     event-time mode. If you need to disable watermarks, please use {@link
     *     ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
     *     TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link
     *     WatermarkStrategy}. If you are using generic "time window" operations (for example {@link
     *     org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
     *     that change behaviour based on the time characteristic, please use equivalent operations
     *     that explicitly specify processing time or event time.
     */

Saya mengetahui bahwa secara default flink mempertimbangkan waktu acara dan untuk itu tanda air perlu ditangani dengan benar yang tidak saya lakukan, jadi saya menambahkan tautan di bawah ini untuk mengatur karakteristik waktu lingkungan eksekusi flink:

Env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

Dan kaboom ... itu mulai berfungsi, sementara ini sudah usang dan membutuhkan beberapa konfigurasi lain, tetapi terima kasih banyak, itu adalah penunjuk yang bagus dan banyak membantu saya dan saya memecahkan masalah ..

Terima kasih sekali lagi @Piotr Nowojski

1
Deepanjan Majumdar 25 Mei 2021, 10:00