0


zookeeper

package com.citi.eqriskvolanalytics.hubblecommon.zookeeper;

import com.citi.eqriskvolanalytics.hubblecommon.utility.Status;
import com.gemstone.bp.edu.emory.mathcs.backport.java.util.Collections;
import com.google.common.base.Strings;
import com.google.common.primitives.Ints;
import com.google.gson.Gson;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.nodes.PersistentTtlNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.*;

import static com.google.common.collect.Lists.newArrayList;
import static java.util.Collections.emptySet;

import java.util.stream.Stream;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterOutputStream;

import static java.util.stream.Collectors.toList;

public class ZooKeeperActual implements ZooKeeper {
private static final long TTL_24_HOURS = 1000 * 60 * 60 * 24;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final CuratorFramework framework;
private final LeaderLatch scheduleLeaderLatch;
private PersistentTtlNode capacityNode;
private String capacityPath;
private String emsPath;
private final Gson gson = new Gson();

public ZooKeeperActual(String connectionString, Status status, String namespace) {
    try {
        var builder = CuratorFrameworkFactory.builder()
                .connectString(connectionString)
                .retryPolicy(new ExponentialBackoffRetry(1000, 29));
        framework = Strings.isNullOrEmpty(namespace) ? builder.build() : builder.namespace(namespace).build();
        framework.start();

        capacityPath = "/capacity/" + status.getHost();
        logger.info("Creating capacity node with path: {}", capacityPath);
        capacityNode = new PersistentTtlNode(framework, capacityPath, 30_000, Ints.toByteArray(0));
        capacityNode.start();

        emsPath = "/ems/statistics";

        scheduleLeaderLatch = new LeaderLatch(framework, "/schedule-leadership");
        scheduleLeaderLatch.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

protected ZooKeeperActual(CuratorFramework framework) {
    this.framework = framework;
    capacityNode = new PersistentTtlNode(framework, "/capacity/tmp", 30_000, Ints.toByteArray(0));
    scheduleLeaderLatch = new LeaderLatch(framework, "/schedule-leadership");
}

@Override
public void start() {
}

@Override
public void stop() {
    safeClose(scheduleLeaderLatch);
    safeClose(capacityNode);
    framework.close();
}

private void safeClose(Closeable closeable) {
    try {
        closeable.close();
    } catch (Exception e) {
        logger.warn("Exception closing {} with error {}", closeable.toString(), e);
    }
}

@Override
public String getString(String path) {
    try {
        var data = framework.getData().forPath(path);

        return new String(data, StandardCharsets.UTF_8);
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public String setString(String path, String value) {
    try {
        var data = value.getBytes(StandardCharsets.UTF_8);

        return framework.create().orSetData().forPath(path, data);
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public boolean hasScheduleLeadership() {
    return scheduleLeaderLatch.hasLeadership();
}

@Override
public void updateBatchTimestamp(String batchId) {
    try {
        // Upserts a node for the given batch id that will be deleted after 24 hours
        framework.create()
                .orSetData()
                .withTtl(TTL_24_HOURS)
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.PERSISTENT_WITH_TTL)
                .forPath(getPathForBatch(batchId));
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public Optional<Instant> getLastBatchTimestamp(String batchId) {
    try {
        var stat = framework.checkExists().forPath(getPathForBatch(batchId));

        return Optional.ofNullable(stat)
                .map(s -> Instant.ofEpochMilli(s.getMtime()));
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public Set<String> getCurrentTotemBatches() {
    var path = "/batch";

    try {
        var stat = framework.checkExists().forPath(path);

        if (stat != null) {
            var childNodes = framework.getChildren().forPath(path);
            return new HashSet<>(childNodes);
        } else {
            return emptySet();
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public void clearTotemBatch(String batchId) {
    try {
        framework.delete()
                .deletingChildrenIfNeeded()
                .forPath(getPathForBatch(batchId));
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public void clearAllTotemBatches() {
    var path = "/batch";
    try {
        var children = framework.getChildren().forPath(path);
        children.forEach(childNodePath -> {
            try {
                framework.delete()
                        .deletingChildrenIfNeeded()
                        .forPath(getPathForBatch(childNodePath));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public void writeEndOfBatch(String batchId) {
    try {
        // Upserts a node for the given batch id that will be deleted after 24 hours
        framework.create()
                .orSetData()
                .withTtl(TTL_24_HOURS)
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.PERSISTENT_WITH_TTL)
                .forPath(getPathForBatch(batchId) + "/end");
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public boolean getEndOfBatch(String batchId) {
    var path = getPathForBatch(batchId) + "/end";

    try {
        var stat = framework.checkExists().forPath(path);

        // If the node exists the batch has finished so return true
        return stat != null;
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public int getTotalItemsInBatch(String batchId) {
    var path = getPathForBatch(batchId) + "/size";
    try {
        if (nodeExistsFor("size", "batch/" + batchId)) {
            return Ints.fromByteArray(framework.getData().forPath(path));
        } else {
            logger.info("Deleting old batch: " + getPathForBatch(batchId));
            framework.delete()
                    .deletingChildrenIfNeeded()
                    .forPath(getPathForBatch(batchId));
            return 1;
        }
    } catch (Exception e) {
        logger.error("Error getting total items in batch: " + e);
        return 1; // to avoid division by 0
    }
}

@Override
public void updateBatchSize(String batchId, int size) {
    var path = getPathForBatch(batchId) + "/size";

    try {
        framework.create()
                .orSetData()
                .withTtl(TTL_24_HOURS)
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.PERSISTENT_WITH_TTL)
                .forPath(path, Ints.toByteArray(size));
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public void updateAvailableCapacityInLastHour(int seconds) {
    if (hasScheduleLeadership()) {
        try {
            logger.info("Pushing capacity of {} to {}", seconds, capacityPath);
            capacityNode.setData(Ints.toByteArray(seconds));
        } catch (Exception e) {
            logger.error("Exception updating data", e);
        }
    } else {
        logger.info("Node with path {} is not leader - not pushing capacity here", capacityPath);
    }
}

@Override
public int getTotalCurrentCapacity() {
    try {
        var capacity = framework.getChildren().forPath("/capacity").stream()
                .map(childName -> {
                    String childCapacityPath = "/capacity/" + childName;
                    try {
                        var capacityForChild = Ints.fromByteArray(framework.getData().forPath(childCapacityPath));
                        logger.info("Got {} capacity from {}", capacityForChild, childCapacityPath);
                        return capacityForChild;
                    } catch (Exception e) {
                        logger.warn("Exception while fetching capacity path {}: {}", childCapacityPath, e.getMessage());
                        return 0;
                    }
                }).mapToInt(Integer::intValue).sum();

        logger.info("Current total capacity: {}", capacity);
        return capacity;
    } catch (Exception e) {
        logger.error("Exception getting children", e);
        return -1;
    }
}

@Override
public void addRecordOfLastFailure(LastFailureInfo lastFailureInfo) {
    addLastFailureForNode(lastFailureInfo, "lastFailureNode");
}

@Override
public void clearRecordOfLastFailure(String id, String lastFailureNodeName) {
    // if node exists for id delete node, else do nothing
    try {
        if (nodeExistsFor(id, lastFailureNodeName)) {
            framework.delete()
                    .deletingChildrenIfNeeded()
                    .forPath("/" + lastFailureNodeName + "/" + id);
            logger.info("Cleared record of last failure for " + id);
        }
    } catch (Exception e) {
        logger.error("Exception clearing record of last failure: " + e);
    }
}

private boolean nodeExistsFor(String childNodeName, String parentNodeName) throws Exception {
    try {
        var children = framework.getChildren().forPath("/" + parentNodeName);
        return children.contains(String.valueOf(childNodeName));
    } catch (KeeperException.NoNodeException noNodeException) {
        logger.warn("No node exception: " + noNodeException);
        return false;
    } catch (Exception e) {
        logger.error("Exception getting children nodes:", e);
        throw new Exception(e);
    }
}

@Override
public List<LastFailureInfo> getRecordsOfLastFailure() {
    return getLastFailureForNode("lastFailureNode");
}

@Override
public List<TotemBatchError> getRecordsOfTotemErrors() {
    try {
        var batchNodes = framework.getChildren().forPath("/totemErrorsNode");
        return batchNodes.stream()
                .map(this::getLastFailuresForBatch).filter(Optional::isPresent).map(Optional::get).collect(toList());
    } catch (Exception e) {
        logger.error("Error getting records of totem error: " + e);
        return newArrayList();
    }
}

@Override
public Optional<TotemBatchError> getLastFailuresForBatch(String batchId) {
    try {
        var errors = framework.getChildren().forPath("/totemErrorsNode/" + batchId).stream()
                .flatMap(securityIdNode -> {
                    String childPath = "/totemErrorsNode/" + batchId + "/" + securityIdNode;
                    try {
                        var resultAsString = new String(decompress(framework.getData().forPath(childPath)));
                        var fromJson = gson.fromJson(resultAsString, LastFailureInfo.class);
                        return Stream.of(fromJson);
                    } catch (Exception e) {
                        logger.warn("Error while fetching totem error data from {}: {}", childPath, e.getMessage());
                        return Stream.empty();
                    }
                }).collect(toList());
        return Optional.of(new TotemBatchError(batchId, errors.size(), getTotalItemsInBatch(batchId), errors));
    } catch (Exception e) {
        logger.error("Error getting records of totem error: " + e);
        return Optional.empty();
    }
}

@Override
public void addRecordOfTotemError(String batchId, LastFailureInfo lastFailureInfo) {
    var timeToLive = java.time.Duration.ofHours(26).toMillis();
    try {
        var jsonData = gson.toJson(lastFailureInfo);
        var compressedData = compress(jsonData.getBytes(StandardCharsets.UTF_8));
        framework.create().orSetData()
                .withTtl(timeToLive)
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT_WITH_TTL)
                .forPath("/totemErrorsNode/" + batchId + "/" + lastFailureInfo.getSecurityId(), compressedData);
    } catch (Exception e) {
        logger.error("Error when trying to add record of totem error.", e);
    }
}

@Override
public List<LastFailureInfo> getRecordsOfOnDemandFailures() {
    return getLastFailureForNode("onDemandFailureNode");
}

@Override
public void addRecordOfOnDemandFailure(LastFailureInfo lastFailureInfo) {
    addLastFailureForNode(lastFailureInfo, "onDemandFailureNode");
}

@Override
public void addEMSMonitoring(EMSInfo emsInfo) {
    try {
        var jsonData = gson.toJson(emsInfo);
        var compressedData = compress(jsonData.getBytes(StandardCharsets.UTF_8));
        framework.create().orSetData()
                .withTtl(TTL_24_HOURS)
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT_WITH_TTL)
                .forPath(emsPath, compressedData);
    } catch (Exception e) {
        logger.error("Error when trying to add EMS info.", e);
    }
}

@Override
public EMSInfo getEMSMonitoringInfo() {
    try {
        var resultAsString = new String(decompress(framework.getData().forPath(emsPath)));
        var emsInfo = gson.fromJson(resultAsString, EMSInfo.class);
        // time since last message will be -1 on the first run and needs to be set by calling the function below
        emsInfo.setTimeSinceLastMessage();
        return emsInfo;
    } catch (Exception e) {
        logger.error("Error getting EMS info: " + e);
        return null; // nothing to display until the node is created and there is EMS info
    }
}

private void addLastFailureForNode(LastFailureInfo lastFailureInfo, String nodeName) {
    var securityId = lastFailureInfo.getSecurityId();
    var timeToLive = java.time.Duration.ofHours(12).toMillis();
    try {
        if (nodeExistsFor(securityId, nodeName)) {
            // node already exists, increment count
            if (getLastFailureForChildNode(nodeName, securityId).isPresent()) {
                var existingLastFailureInfo = getLastFailureForChildNode(nodeName, securityId).get();
                var newCount = existingLastFailureInfo.countExists() ? existingLastFailureInfo.getCount().incrementAndGet() : 1;
                lastFailureInfo.setCount(newCount);
            }
        }
        var jsonData = gson.toJson(lastFailureInfo);
        var compressedData = compress(jsonData.getBytes(StandardCharsets.UTF_8));
        framework.create().orSetData().withTtl(timeToLive).creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_WITH_TTL).forPath(
                "/" + nodeName + "/" + securityId, compressedData);
    } catch (Exception e) {
        logger.error("Error when trying to add record of last failure to node {}", nodeName, e);
    }
}

private List<LastFailureInfo> getLastFailureForNode(String nodeName) {
    try {
        return framework.getChildren().forPath("/" + nodeName).stream()
                .flatMap(childName -> {
                    String childPath = "/" + nodeName + "/" + childName;
                    try {
                        var resultAsString = new String(decompress(framework.getData().forPath(childPath)));
                        var fromJson = gson.fromJson(resultAsString, LastFailureInfo.class);
                        return Stream.of(fromJson);
                    } catch (Exception e) {
                        logger.warn("Error while fetching last failure data from {}: {}", childPath, e.getMessage());
                        return Stream.empty();
                    }
                }).collect(toList());

    } catch (Exception e) {
        logger.error("Error getting records of last failure from node {} : {}", nodeName, e);
        return Collections.emptyList();
    }
}

private Optional<LastFailureInfo> getLastFailureForChildNode(String nodeName, String ric) {
    try {
        return framework.getChildren().forPath("/" + nodeName).stream()
                .flatMap(childName -> {
                    if (childName.equalsIgnoreCase(ric)) {
                        var childPath = "/" + nodeName + "/" + childName;
                        try {
                            var resultAsString = new String(decompress(framework.getData().forPath(childPath)));
                            var fromJson = gson.fromJson(resultAsString, LastFailureInfo.class);
                            return Stream.of(fromJson);
                        } catch (Exception e) {
                            logger.warn("Error while fetching last failure data from {}: {}", childPath, e.getMessage());
                            return Stream.empty();
                        }
                    } else {
                        return Stream.empty();
                    }
                }).findFirst();
    } catch (Exception e) {
        logger.error("Error getting records of last failure from node {}/{}: {}", nodeName, ric, e);
        return Optional.empty();
    }
}

@Override
public CuratorFramework getFramework() {
    return framework;
}

private String getPathForBatch(String batchId) {
    return "/batch/" + batchId;
}

private byte[] compress(byte[] in) {
    try {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DeflaterOutputStream defl = new DeflaterOutputStream(out);
        defl.write(in);
        defl.flush();
        defl.close();

        return out.toByteArray();
    } catch (Exception e) {
        logger.error("Error trying to compress data.", e);
        return in;
    }
}

private byte[] decompress(byte[] in) {
    try {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        InflaterOutputStream infl = new InflaterOutputStream(out);
        infl.write(in);
        infl.flush();
        infl.close();

        return out.toByteArray();
    } catch (Exception e) {
        logger.error("Error trying to decompress data.", e);
        return in;
    }
}

}


本文转载自: https://blog.csdn.net/weixin_38964818/article/details/136092567
版权归原作者 weixin_38964818 所有, 如有侵权,请联系我们删除。

“zookeeper”的评论:

还没有评论