Saya mencoba memulai struktur proyek dasar di mana beberapa aplikasi boot musim semi akan berbagi sumber daya menggunakan kurator Apache.

Saya mengikuti panduan yang ditentukan dalam dokumentasi tetapi mengubah node tidak memicu peristiwa apa pun

Tolong, bantuan apa pun akan dihargai

Pom.xml

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

Docker-compose.yaml

version: '3.1'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

Pencipta

package com.training.zoo.sss;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.System.out;

@Service
public class Client {
    String connectionInfo = "127.0.0.1:2181";
    String ZK_PATH = "/someapp/somemodule/someroute";

    public Client() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .namespace("base")
                        .build();
        client.start();

        Stat stat1 = client.checkExists().creatingParentContainersIfNeeded().forPath(ZK_PATH);
        if (stat1 == null) {
            client.create().forPath(ZK_PATH, "sometdata".getBytes());
        }

        byte[] bytes = client.getData().forPath(ZK_PATH);
        out.println(new String(bytes, StandardCharsets.UTF_8));

        // Update value every half second
        final AtomicInteger i = new AtomicInteger(0);
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
        exec.scheduleAtFixedRate(new Runnable(){
            @Override
            public void run(){
                i.set(i.get()+1);
                System.out.println(i);
                try {
                    client.setData().forPath(ZK_PATH, ("init_" + i ).getBytes());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
}

Pendengar

package com.training.bookstore.request;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;

@Service
public class Watcher2 {
    String connectionInfo = "127.0.0.1:2181";
    String ZK_PATH = "/someapp/somemodule/someroute";

    public Watcher2() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .namespace("base")
                        .build();
        client.start();

        PathChildrenCache watcher = new PathChildrenCache(
                client, ZK_PATH, true    // if cache data
        );

        watcher.getListenable().addListener((client1, event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
            }
        });
        watcher.start(PathChildrenCache.StartMode.NORMAL);
        System.out.println("Register zk watcher successfully!");
    }
}

Terima kasih

0
regenbar 9 Mei 2021, 21:58

1 menjawab

Jawaban Terbaik

Jadi ya nama kelas PathChildrenCache adalah hadiah mati. Kedengarannya aneh https://www.youtube.com/watch?v=nZcRU0Op5P4

Jika saya memublikasikan ke /path1/path2 dan saya mendengarkan path /path1/path2 apakah saya benar-benar mendengarkan path1 atau path2? Peringatan spoiler: Anda sedang mendengarkan path2 yang merupakan folder dan bukan simpul yang Anda pikir telah Anda buat

Solusinya adalah Jika produsen memproduksi pada jalur yang ditentukan

    String connectionInfo = "127.0.0.1:2181";
    String PATH = "/someapp/somemodule/whatever";

Di kelas Watcher setel jalur ke "induk" dari simpul itu

    String connectionInfo = "127.0.0.1:2181";
    String PATH = "/someapp/somemodule";

Dan jika Anda perlu mendengarkan subnode/subfolder dari jalur produser Anda, daripada menggunakan PathChildrenCache gunakan TreeCache

0
regenbar 9 Mei 2021, 20:09