Katakanlah saya memiliki JSON String

{"targetTable": "table", "primaryKey": {"A": "a"}, "payload": {"A": "a", "B": "b"}}

Saya tertarik untuk membuat serial ke org.apache.beam.sdk.values.Row, menggunakan RowCoder.of(schema), skema yang didefinisikan seperti di bawah ini

import java.io.InputStream;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.schemas.Schema;

Schema schema1 = Schema.builder()
    .addStringField("A")
    .build();

Schema schema2 = Schema.builder()
    .addStringField("A")
    .addStringField("B")
    .build();

Schema schema = Schema.of(
    Schema.Field.of("targetTable", Schema.FieldType.STRING),
    Schema.Field.of("primaryKey", Schema.FieldType.row(schema1)),
    Schema.Field.of("payload", Schema.FieldType.row(schema2)));

String jsonString = "{\"targetTable\": \"table\", \"primaryKey\": {\"A\": \"a\"}, \"payload\": {\"A\": \"a\", \"B\": \"b\"}}"

InputStream input = ???;
Row row = RowCoder.of(schema).decode(input);

// assert(row.getString("targetTable") == "hello");
// assert(row.getRow("primaryKey").getSchema().equivalent(schemaA1));
// assert(row.getRow("payload").getSchema().equivalent(schemaA0));

Bagaimana saya bisa membangun InputStream dengan benar untuk mengubah struktur JSON bersarang saya menjadi Beam Row? Saya juga mencoba ini tetapi saya bingung dan tidak yakin tentang cara membangun byte[] dengan benar.

InputStream input = new ByteArrayInputStream(jsonString.getBytes(StandardCharsets.UTF_8));

Terima kasih.

0
Michel Hua 18 April 2021, 11:20

1 menjawab

Jawaban Terbaik

Anda bisa menggunakan JsonToRow untuk ini yang melakukan deserialisasi string dan pengkodean Schema untuk Anda. Setelah mendefinisikan Schema yang menyerupai struktur JSON Anda.

Berhati-hatilah karena beberapa tipe data seperti Schema.TypeName.DATETIME tidak didukung, Anda mungkin perlu menggunakan Schema.TypeName.STRING sebagai gantinya.

import org.apache.beam.sdk.transforms.JsonToRow;

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(schema))

Lihat https://cloud.google.com/architecture/e-commerce /patterns/converting-json

0
Michel Hua 18 April 2021, 13:02