8000 Add metadata dump and load jraft snapshot by KomachiSion · Pull Request #4367 · alibaba/nacos · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add metadata dump and load jraft snapshot #4367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public String getType() {
public static class None extends AbstractHealthChecker {

public static final String TYPE = "NONE";


private static final long serialVersionUID = -760631831097384737L;

public None() {
super(TYPE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class Http extends AbstractHealthChecker {

public static final String TYPE = "HTTP";

private static final long serialVersionUID = 551826315222362349L;

private String path = "";

private String headers = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class Mysql extends AbstractHealthChecker {

public static final String TYPE = "MYSQL";

private static final long serialVersionUID = 7928108094599401491L;

private String user;

private String pwd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class Tcp extends AbstractHealthChecker {

public static final String TYPE = "TCP";

private static final long serialVersionUID = -9116042038157496294L;

public Tcp() {
super(TYPE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class TestChecker extends AbstractHealthChecker {
@JsonTypeInfo(use = Id.NAME, property = "type")
public static final String TYPE = "TEST";

private static final long serialVersionUID = 2472091207760970225L;

private String testValue;

public String getTestValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ private DefaultRequestFuture sendRequestInner(Request request, RequestMeta meta,
sendRequestNoAck(request, meta);

DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getMetaInfo().getConnectionId(), requestId,
callBack, new DefaultRequestFuture.TimeoutInnerTrigger() {
@Override
< 6D47 /td> public void triggerOnTimeout() {
RpcAckCallbackSynchronizer.clearFuture(getMetaInfo().getConnectionId(), requestId);
}
});
callBack, () -> RpcAckCallbackSynchronizer.clearFuture(getMetaInfo().getConnectionId(), requestId));

RpcAckCallbackSynchronizer.syncCallback(getMetaInfo().getConnectionId(), requestId, defaultPushFuture);
return defaultPushFuture;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.persistent.impl;

import com.alibaba.nacos.consistency.snapshot.Reader;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.consistency.snapshot.Writer;
import com.alibaba.nacos.core.distributed.raft.utils.RaftExecutor;
import com.alibaba.nacos.core.utils.TimerContext;
import com.alibaba.nacos.naming.misc.Loggers;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;

/**
* Abstract snapshot operation.
*
* @author xiweng.yy
*/
public abstract class AbstractSnapshotOperation implements SnapshotOperation {

protected static final String CHECK_SUM_KEY = "checksum";

private final ReentrantReadWriteLock.WriteLock writeLock;

public AbstractSnapshotOperation(ReentrantReadWriteLock lock) {
this.writeLock = lock.writeLock();
}

@Override
public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFinally) {
RaftExecutor.doSnapshot(() -> {
TimerContext.start(getSnapshotSaveTag());
final Lock lock = writeLock;
lock.lock();
try {
callFinally.accept(writeSnapshot(writer), null);
} catch (Throwable t) {
Loggers.RAFT.error("Fail to compress snapshot, path={}, file list={}.", writer.getPath(),
writer.listFiles(), t);
callFinally.accept(false, t);
} finally {
lock.unlock();
TimerContext.end(getSnapshotSaveTag(), Loggers.RAFT);
}
});
}

@Override
public boolean onSnapshotLoad(Reader reader) {
TimerContext.start(getSnapshotLoadTag());
final Lock lock = writeLock;
lock.lock();
try {
return readSnapshot(reader);
} catch (final Throwable t) {
Loggers.RAFT
.error("Fail to load snapshot, path={}, file list={}.", reader.getPath(), reader.listFiles(), t);
return false;
} finally {
lock.unlock();
TimerContext.end(getSnapshotLoadTag(), Loggers.RAFT);
}
}

/**
* Write snapshot.
*
* @param writer snapshot writer
* @return {@code true} if write snapshot successfully, otherwise {@code false}
* @throws Exception any exception during writing
*/
protected abstract boolean writeSnapshot(Writer writer) throws Exception;

/**
* Read snapshot.
*
* @param reader snapshot reader
* @return {@code true} if read snapshot successfully, otherwise {@code false}
* @throws Exception any exception during reading
*/
protected abstract boolean readSnapshot(Reader reader) throws Exception;

/**
* Get snapshot save tag. It will be used to see time metric time context.
*
* @return snapshot save tag
*/
protected abstract String getSnapshotSaveTag();

/**
* Get snapshot load tag. It will be used to see time metric time context.
*
* @return snapshot load tag
*/
protected abstract String getSnapshotLoadTag();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,23 @@
import com.alibaba.nacos.common.utils.Objects;
import com.alibaba.nacos.consistency.snapshot.LocalFileMeta;
import com.alibaba.nacos.consistency.snapshot.Reader;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.consistency.snapshot.Writer;
import com.alibaba.nacos.core.distributed.raft.utils.RaftExecutor;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.core.utils.TimerContext;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.alipay.sofa.jraft.util.CRC64;

import java.nio.file.Paths;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.zip.Checksum;

/**
* Snapshot processing of persistent service data for accelerated Raft protocol recovery and data synchronization.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
* @author xiweng.yy
*/
public class NamingSnapshotOperation implements SnapshotOperation {
public class NamingSnapshotOperation extends AbstractSnapshotOperation {

private static final String NAMING_SNAPSHOT_SAVE = NamingSnapshotOperation.class.getSimpleName() + ".SAVE";

Expand All @@ -49,81 +45,57 @@ public class NamingSnapshotOperation implements SnapshotOperation {

private final String snapshotArchive = "naming_persistent.zip";

private final String checkSumKey = "checkSum";

private final KvStorage storage;

private final ReentrantReadWriteLock.WriteLock writeLock;

public NamingSnapshotOperation(KvStorage storage, ReentrantReadWriteLock lock) {
super(lock);
this.storage = storage;
this.writeLock = lock.writeLock();
}

@Override
public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFinally) {
RaftExecutor.doSnapshot(() -> {
TimerContext.start(NAMING_SNAPSHOT_SAVE);

final Lock lock = writeLock;
lock.lock();
try {
final String writePath = writer.getPath();
final String parentPath = Paths.get(writePath, snapshotDir).toString();
DiskUtils.deleteDirectory(parentPath);
DiskUtils.forceMkdir(parentPath);

storage.doSnapshot(parentPath);
final String outputFile = Paths.get(writePath, snapshotArchive).toString();
final Checksum checksum = new CRC64();
DiskUtils.compress(writePath, snapshotDir, outputFile, checksum);
DiskUtils.deleteDirectory(parentPath);

final LocalFileMeta meta = new LocalFileMeta();
meta.append(checkSumKey, Long.toHexString(checksum.getValue()));

callFinally.accept(writer.addFile(snapshotArchive, meta), null);
} catch (Throwable t) {
Loggers.RAFT.error("Fail to compress snapshot, path={}, file list={}, {}.", writer.getPath(),
writer.listFiles(), t);
callFinally.accept(false, t);
} finally {
lock.unlock();
TimerContext.end(NAMING_SNAPSHOT_SAVE, Loggers.RAFT);
}
});
protected boolean writeSnapshot(Writer writer) throws Exception {
final String writePath = writer.getPath();
final String parentPath = Paths.get(writePath, snapshotDir).toString();
DiskUtils.deleteDirectory(parentPath);
DiskUtils.forceMkdir(parentPath);

storage.doSnapshot(parentPath);
final String outputFile = Paths.get(writePath, snapshotArchive).toString();
final Checksum checksum = new CRC64();
DiskUtils.compress(writePath, snapshotDir, outputFile, checksum);
DiskUtils.deleteDirectory(parentPath);

final LocalFileMeta meta = new LocalFileMeta();
meta.append(CHECK_SUM_KEY, Long.toHexString(checksum.getValue()));
return writer.addFile(snapshotArchive, meta);
}

@Override
public boolean onSnapshotLoad(Reader reader) {
protected boolean readSnapshot(Reader reader) throws Exception {
final String readerPath = reader.getPath();
final String sourceFile = Paths.get(readerPath, snapshotArchive).toString();

TimerContext.start(NAMING_SNAPSHOT_LOAD);
final Lock lock = writeLock;
lock.lock();
try {
final Checksum checksum = new CRC64();
DiskUtils.decompress(sourceFile, readerPath, checksum);
LocalFileMeta fileMeta = reader.getFileMeta(snapshotArchive);
if (fileMeta.getFileMeta().containsKey(checkSumKey)) {
if (!Objects.equals(Long.toHexString(checksum.getValue()), fileMeta.get(checkSumKey))) {
throw new IllegalArgumentException("Snapshot checksum failed");
}
final Checksum checksum = new CRC64();
DiskUtils.decompress(sourceFile, readerPath, checksum);
LocalFileMeta fileMeta = reader.getFileMeta(snapshotArchive);
if (fileMeta.getFileMeta().containsKey(CHECK_SUM_KEY)) {
if (!Objects.equals(Long.toHexString(checksum.getValue()), fileMeta.get(CHECK_SUM_KEY))) {
throw new IllegalArgumentException("Snapshot checksum failed");
}

final String loadPath = Paths.get(readerPath, snapshotDir).toString();
storage.snapshotLoad(loadPath);
Loggers.RAFT.info("snapshot load from : {}", loadPath);
DiskUtils.deleteDirectory(loadPath);
return true;
} catch (final Throwable t) {
Loggers.RAFT.error("Fail to load snapshot, path={}, file list={}, {}.",
Paths.get(readerPath, snapshotDir).toString(), reader.listFiles(), t);
return false;
} finally {
lock.unlock();
TimerContext.end(NAMING_SNAPSHOT_LOAD, Loggers.RAFT);
}
final String loadPath = Paths.get(readerPath, snapshotDir).toString();
storage.snapshotLoad(loadPath);
Loggers.RAFT.info("snapshot load from : {}", loadPath);
DiskUtils.deleteDirectory(loadPath);
return true;
}

@Override
protected String getSnapshotSaveTag() {
return NAMING_SNAPSHOT_SAVE;
}

@Override
protected String getSnapshotLoadTag() {
return NAMING_SNAPSHOT_LOAD;
}
}
Loading
0