package com.citi.eqriskvolanalytics.hubblecommon.zookeeper;
import com.citi.eqriskvolanalytics.hubblecommon.utility.Status;
import com.gemstone.bp.edu.emory.mathcs.backport.java.util.Collections;
import com.google.common.base.Strings;
import com.google.common.primitives.Ints;
import com.google.gson.Gson;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.nodes.PersistentTtlNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.*;
import static com.google.common.collect.Lists.newArrayList;
import static java.util.Collections.emptySet;
import java.util.stream.Stream;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterOutputStream;
import static java.util.stream.Collectors.toList;
public class ZooKeeperActual implements ZooKeeper {
private static final long TTL_24_HOURS = 1000 * 60 * 60 * 24;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final CuratorFramework framework;
private final LeaderLatch scheduleLeaderLatch;
private PersistentTtlNode capacityNode;
private String capacityPath;
private String emsPath;
private final Gson gson = new Gson();
public ZooKeeperActual(String connectionString, Status status, String namespace) {
try {
var builder = CuratorFrameworkFactory.builder()
.connectString(connectionString)
.retryPolicy(new ExponentialBackoffRetry(1000, 29));
framework = Strings.isNullOrEmpty(namespace) ? builder.build() : builder.namespace(namespace).build();
framework.start();
capacityPath = "/capacity/" + status.getHost();
logger.info("Creating capacity node with path: {}", capacityPath);
capacityNode = new PersistentTtlNode(framework, capacityPath, 30_000, Ints.toByteArray(0));
capacityNode.start();
emsPath = "/ems/statistics";
scheduleLeaderLatch = new LeaderLatch(framework, "/schedule-leadership");
scheduleLeaderLatch.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected ZooKeeperActual(CuratorFramework framework) {
this.framework = framework;
capacityNode = new PersistentTtlNode(framework, "/capacity/tmp", 30_000, Ints.toByteArray(0));
scheduleLeaderLatch = new LeaderLatch(framework, "/schedule-leadership");
}
@Override
public void start() {
}
@Override
public void stop() {
safeClose(scheduleLeaderLatch);
safeClose(capacityNode);
framework.close();
}
private void safeClose(Closeable closeable) {
try {
closeable.close();
} catch (Exception e) {
logger.warn("Exception closing {} with error {}", closeable.toString(), e);
}
}
@Override
public String getString(String path) {
try {
var data = framework.getData().forPath(path);
return new String(data, StandardCharsets.UTF_8);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public String setString(String path, String value) {
try {
var data = value.getBytes(StandardCharsets.UTF_8);
return framework.create().orSetData().forPath(path, data);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean hasScheduleLeadership() {
return scheduleLeaderLatch.hasLeadership();
}
@Override
public void updateBatchTimestamp(String batchId) {
try {
// Upserts a node for the given batch id that will be deleted after 24 hours
framework.create()
.orSetData()
.withTtl(TTL_24_HOURS)
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.forPath(getPathForBatch(batchId));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Optional<Instant> getLastBatchTimestamp(String batchId) {
try {
var stat = framework.checkExists().forPath(getPathForBatch(batchId));
return Optional.ofNullable(stat)
.map(s -> Instant.ofEpochMilli(s.getMtime()));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Set<String> getCurrentTotemBatches() {
var path = "/batch";
try {
var stat = framework.checkExists().forPath(path);
if (stat != null) {
var childNodes = framework.getChildren().forPath(path);
return new HashSet<>(childNodes);
} else {
return emptySet();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void clearTotemBatch(String batchId) {
try {
framework.delete()
.deletingChildrenIfNeeded()
.forPath(getPathForBatch(batchId));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void clearAllTotemBatches() {
var path = "/batch";
try {
var children = framework.getChildren().forPath(path);
children.forEach(childNodePath -> {
try {
framework.delete()
.deletingChildrenIfNeeded()
.forPath(getPathForBatch(childNodePath));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void writeEndOfBatch(String batchId) {
try {
// Upserts a node for the given batch id that will be deleted after 24 hours
framework.create()
.orSetData()
.withTtl(TTL_24_HOURS)
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.forPath(getPathForBatch(batchId) + "/end");
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean getEndOfBatch(String batchId) {
var path = getPathForBatch(batchId) + "/end";
try {
var stat = framework.checkExists().forPath(path);
// If the node exists the batch has finished so return true
return stat != null;
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public int getTotalItemsInBatch(String batchId) {
var path = getPathForBatch(batchId) + "/size";
try {
if (nodeExistsFor("size", "batch/" + batchId)) {
return Ints.fromByteArray(framework.getData().forPath(path));
} else {
logger.info("Deleting old batch: " + getPathForBatch(batchId));
framework.delete()
.deletingChildrenIfNeeded()
.forPath(getPathForBatch(batchId));
return 1;
}
} catch (Exception e) {
logger.error("Error getting total items in batch: " + e);
return 1; // to avoid division by 0
}
}
@Override
public void updateBatchSize(String batchId, int size) {
var path = getPathForBatch(batchId) + "/size";
try {
framework.create()
.orSetData()
.withTtl(TTL_24_HOURS)
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.forPath(path, Ints.toByteArray(size));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void updateAvailableCapacityInLastHour(int seconds) {
if (hasScheduleLeadership()) {
try {
logger.info("Pushing capacity of {} to {}", seconds, capacityPath);
capacityNode.setData(Ints.toByteArray(seconds));
} catch (Exception e) {
logger.error("Exception updating data", e);
}
} else {
logger.info("Node with path {} is not leader - not pushing capacity here", capacityPath);
}
}
@Override
public int getTotalCurrentCapacity() {
try {
var capacity = framework.getChildren().forPath("/capacity").stream()
.map(childName -> {
String childCapacityPath = "/capacity/" + childName;
try {
var capacityForChild = Ints.fromByteArray(framework.getData().forPath(childCapacityPath));
logger.info("Got {} capacity from {}", capacityForChild, childCapacityPath);
return capacityForChild;
} catch (Exception e) {
logger.warn("Exception while fetching capacity path {}: {}", childCapacityPath, e.getMessage());
return 0;
}
}).mapToInt(Integer::intValue).sum();
logger.info("Current total capacity: {}", capacity);
return capacity;
} catch (Exception e) {
logger.error("Exception getting children", e);
return -1;
}
}
@Override
public void addRecordOfLastFailure(LastFailureInfo lastFailureInfo) {
addLastFailureForNode(lastFailureInfo, "lastFailureNode");
}
@Override
public void clearRecordOfLastFailure(String id, String lastFailureNodeName) {
// if node exists for id delete node, else do nothing
try {
if (nodeExistsFor(id, lastFailureNodeName)) {
framework.delete()
.deletingChildrenIfNeeded()
.forPath("/" + lastFailureNodeName + "/" + id);
logger.info("Cleared record of last failure for " + id);
}
} catch (Exception e) {
logger.error("Exception clearing record of last failure: " + e);
}
}
private boolean nodeExistsFor(String childNodeName, String parentNodeName) throws Exception {
try {
var children = framework.getChildren().forPath("/" + parentNodeName);
return children.contains(String.valueOf(childNodeName));
} catch (KeeperException.NoNodeException noNodeException) {
logger.warn("No node exception: " + noNodeException);
return false;
} catch (Exception e) {
logger.error("Exception getting children nodes:", e);
throw new Exception(e);
}
}
@Override
public List<LastFailureInfo> getRecordsOfLastFailure() {
return getLastFailureForNode("lastFailureNode");
}
@Override
public List<TotemBatchError> getRecordsOfTotemErrors() {
try {
var batchNodes = framework.getChildren().forPath("/totemErrorsNode");
return batchNodes.stream()
.map(this::getLastFailuresForBatch).filter(Optional::isPresent).map(Optional::get).collect(toList());
} catch (Exception e) {
logger.error("Error getting records of totem error: " + e);
return newArrayList();
}
}
@Override
public Optional<TotemBatchError> getLastFailuresForBatch(String batchId) {
try {
var errors = framework.getChildren().forPath("/totemErrorsNode/" + batchId).stream()
.flatMap(securityIdNode -> {
String childPath = "/totemErrorsNode/" + batchId + "/" + securityIdNode;
try {
var resultAsString = new String(decompress(framework.getData().forPath(childPath)));
var fromJson = gson.fromJson(resultAsString, LastFailureInfo.class);
return Stream.of(fromJson);
} catch (Exception e) {
logger.warn("Error while fetching totem error data from {}: {}", childPath, e.getMessage());
return Stream.empty();
}
}).collect(toList());
return Optional.of(new TotemBatchError(batchId, errors.size(), getTotalItemsInBatch(batchId), errors));
} catch (Exception e) {
logger.error("Error getting records of totem error: " + e);
return Optional.empty();
}
}
@Override
public void addRecordOfTotemError(String batchId, LastFailureInfo lastFailureInfo) {
var timeToLive = java.time.Duration.ofHours(26).toMillis();
try {
var jsonData = gson.toJson(lastFailureInfo);
var compressedData = compress(jsonData.getBytes(StandardCharsets.UTF_8));
framework.create().orSetData()
.withTtl(timeToLive)
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.forPath("/totemErrorsNode/" + batchId + "/" + lastFailureInfo.getSecurityId(), compressedData);
} catch (Exception e) {
logger.error("Error when trying to add record of totem error.", e);
}
}
@Override
public List<LastFailureInfo> getRecordsOfOnDemandFailures() {
return getLastFailureForNode("onDemandFailureNode");
}
@Override
public void addRecordOfOnDemandFailure(LastFailureInfo lastFailureInfo) {
addLastFailureForNode(lastFailureInfo, "onDemandFailureNode");
}
@Override
public void addEMSMonitoring(EMSInfo emsInfo) {
try {
var jsonData = gson.toJson(emsInfo);
var compressedData = compress(jsonData.getBytes(StandardCharsets.UTF_8));
framework.create().orSetData()
.withTtl(TTL_24_HOURS)
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.forPath(emsPath, compressedData);
} catch (Exception e) {
logger.error("Error when trying to add EMS info.", e);
}
}
@Override
public EMSInfo getEMSMonitoringInfo() {
try {
var resultAsString = new String(decompress(framework.getData().forPath(emsPath)));
var emsInfo = gson.fromJson(resultAsString, EMSInfo.class);
// time since last message will be -1 on the first run and needs to be set by calling the function below
emsInfo.setTimeSinceLastMessage();
return emsInfo;
} catch (Exception e) {
logger.error("Error getting EMS info: " + e);
return null; // nothing to display until the node is created and there is EMS info
}
}
private void addLastFailureForNode(LastFailureInfo lastFailureInfo, String nodeName) {
var securityId = lastFailureInfo.getSecurityId();
var timeToLive = java.time.Duration.ofHours(12).toMillis();
try {
if (nodeExistsFor(securityId, nodeName)) {
// node already exists, increment count
if (getLastFailureForChildNode(nodeName, securityId).isPresent()) {
var existingLastFailureInfo = getLastFailureForChildNode(nodeName, securityId).get();
var newCount = existingLastFailureInfo.countExists() ? existingLastFailureInfo.getCount().incrementAndGet() : 1;
lastFailureInfo.setCount(newCount);
}
}
var jsonData = gson.toJson(lastFailureInfo);
var compressedData = compress(jsonData.getBytes(StandardCharsets.UTF_8));
framework.create().orSetData().withTtl(timeToLive).creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_WITH_TTL).forPath(
"/" + nodeName + "/" + securityId, compressedData);
} catch (Exception e) {
logger.error("Error when trying to add record of last failure to node {}", nodeName, e);
}
}
private List<LastFailureInfo> getLastFailureForNode(String nodeName) {
try {
return framework.getChildren().forPath("/" + nodeName).stream()
.flatMap(childName -> {
String childPath = "/" + nodeName + "/" + childName;
try {
var resultAsString = new String(decompress(framework.getData().forPath(childPath)));
var fromJson = gson.fromJson(resultAsString, LastFailureInfo.class);
return Stream.of(fromJson);
} catch (Exception e) {
logger.warn("Error while fetching last failure data from {}: {}", childPath, e.getMessage());
return Stream.empty();
}
}).collect(toList());
} catch (Exception e) {
logger.error("Error getting records of last failure from node {} : {}", nodeName, e);
return Collections.emptyList();
}
}
private Optional<LastFailureInfo> getLastFailureForChildNode(String nodeName, String ric) {
try {
return framework.getChildren().forPath("/" + nodeName).stream()
.flatMap(childName -> {
if (childName.equalsIgnoreCase(ric)) {
var childPath = "/" + nodeName + "/" + childName;
try {
var resultAsString = new String(decompress(framework.getData().forPath(childPath)));
var fromJson = gson.fromJson(resultAsString, LastFailureInfo.class);
return Stream.of(fromJson);
} catch (Exception e) {
logger.warn("Error while fetching last failure data from {}: {}", childPath, e.getMessage());
return Stream.empty();
}
} else {
return Stream.empty();
}
}).findFirst();
} catch (Exception e) {
logger.error("Error getting records of last failure from node {}/{}: {}", nodeName, ric, e);
return Optional.empty();
}
}
@Override
public CuratorFramework getFramework() {
return framework;
}
private String getPathForBatch(String batchId) {
return "/batch/" + batchId;
}
private byte[] compress(byte[] in) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DeflaterOutputStream defl = new DeflaterOutputStream(out);
defl.write(in);
defl.flush();
defl.close();
return out.toByteArray();
} catch (Exception e) {
logger.error("Error trying to compress data.", e);
return in;
}
}
private byte[] decompress(byte[] in) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
InflaterOutputStream infl = new InflaterOutputStream(out);
infl.write(in);
infl.flush();
infl.close();
return out.toByteArray();
} catch (Exception e) {
logger.error("Error trying to decompress data.", e);
return in;
}
}
}
版权归原作者 weixin_38964818 所有, 如有侵权,请联系我们删除。