/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.io.file;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.scalecube.services.annotations.ServiceMethod;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.function.Function;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveQuery;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.exception.NotFoundException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.io.file.FileEntity;
import org.jetlinks.community.io.file.FileInfo;
import org.jetlinks.community.io.file.FileManager;
import org.jetlinks.community.io.file.FileOption;
import org.jetlinks.community.io.file.FileProperties;
import org.jetlinks.core.rpc.RpcManager;
import org.reactivestreams.Publisher;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.codec.multipart.FilePart;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ClusterFileManager
implements FileManager {
    private final FileProperties properties;
    private final NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
    private final ReactiveRepository<FileEntity, String> repository;
    private final RpcManager rpcManager;

    public ClusterFileManager(RpcManager rpcManager, FileProperties properties, ReactiveRepository<FileEntity, String> repository) {
        new File(properties.getStorageBasePath()).mkdirs();
        this.properties = properties;
        this.rpcManager = rpcManager;
        this.repository = repository;
        rpcManager.registerService((Object)new ServiceImpl());
    }

    @Override
    public Mono<FileInfo> saveFile(FilePart filePart, FileOption ... options) {
        return this.saveFile(filePart.filename(), (Flux<DataBuffer>)filePart.content(), new FileOption[0]);
    }

    private DataBuffer updateDigest(MessageDigest digest, DataBuffer dataBuffer) {
        dataBuffer = DataBufferUtils.retain((DataBuffer)dataBuffer);
        digest.update(dataBuffer.asByteBuffer());
        DataBufferUtils.release((DataBuffer)dataBuffer);
        return dataBuffer;
    }

    public Mono<FileInfo> doSaveFile(String name, Flux<DataBuffer> stream, FileOption ... options) {
        LocalDate now = LocalDate.now();
        FileInfo fileInfo = new FileInfo();
        fileInfo.setId((String)IDGenerator.MD5.generate());
        fileInfo.withFileName(name);
        String storagePath = now.format(DateTimeFormatter.BASIC_ISO_DATE) + "/" + fileInfo.getId() + "." + fileInfo.getExtension();
        MessageDigest md5 = DigestUtils.getMd5Digest();
        MessageDigest sha256 = DigestUtils.getSha256Digest();
        String storageBasePath = this.properties.getStorageBasePath();
        String serverNodeId = this.rpcManager.currentServerId();
        Path path = Paths.get(storageBasePath, storagePath);
        path.toFile().getParentFile().mkdirs();
        return ((Mono)stream.map(buffer -> this.updateDigest(md5, this.updateDigest(sha256, (DataBuffer)buffer))).as(buf -> DataBufferUtils.write((Publisher)buf, (Path)path, (OpenOption[])new OpenOption[]{StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, StandardOpenOption.TRUNCATE_EXISTING}))).then(Mono.defer(() -> {
            File savedFile = Paths.get(storageBasePath, storagePath).toFile();
            if (!savedFile.exists()) {
                return Mono.error((Throwable)new BusinessException("error.file_storage_failed"));
            }
            fileInfo.withAccessKey((String)IDGenerator.MD5.generate());
            fileInfo.setMd5(ByteBufUtil.hexDump((byte[])md5.digest()));
            fileInfo.setSha256(ByteBufUtil.hexDump((byte[])sha256.digest()));
            fileInfo.setLength(savedFile.length());
            fileInfo.setCreateTime(System.currentTimeMillis());
            fileInfo.setOptions(options);
            FileEntity entity = FileEntity.of(fileInfo, storagePath, serverNodeId);
            return this.repository.insert((Object)entity).then(Mono.fromSupplier(entity::toInfo));
        }));
    }

    @Override
    public Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream, FileOption ... options) {
        return this.doSaveFile(name, stream, options);
    }

    @Override
    public Mono<FileInfo> getFileByMd5(String md5) {
        return ((ReactiveQuery)this.repository.createQuery().where(FileEntity::getMd5, (Object)md5)).fetchOne().map(FileEntity::toInfo);
    }

    @Override
    public Mono<FileInfo> getFileBySha256(String sha256) {
        return ((ReactiveQuery)this.repository.createQuery().where(FileEntity::getSha256, (Object)sha256)).fetchOne().map(FileEntity::toInfo);
    }

    @Override
    public Mono<FileInfo> getFile(String id) {
        return this.repository.findById((Object)id).map(FileEntity::toInfo);
    }

    private Flux<DataBuffer> readFile(String filePath, long position) {
        return DataBufferUtils.read((Resource)new FileSystemResource(Paths.get(this.properties.getStorageBasePath(), filePath)), (long)position, (DataBufferFactory)this.bufferFactory, (int)((int)this.properties.getReadBufferSize().toBytes())).onErrorMap(NoSuchFileException.class, e -> new NotFoundException());
    }

    private Flux<DataBuffer> readFile(FileEntity file, long position) {
        if (Objects.equals(file.getServerNodeId(), this.rpcManager.currentServerId())) {
            return this.readFile(file.getStoragePath(), position);
        }
        return this.readFromAnotherServer(file, position);
    }

    protected Flux<DataBuffer> readFromAnotherServer(FileEntity file, long position) {
        return this.rpcManager.getService(file.getServerNodeId(), Service.class).switchIfEmpty(Mono.error(NotFoundException::new)).flatMapMany(service -> service.read(new ReadRequest((String)file.getId(), position))).map(arg_0 -> ((NettyDataBufferFactory)this.bufferFactory).wrap(arg_0)).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
    }

    @Override
    public Flux<DataBuffer> read(String id) {
        return this.read(id, 0L);
    }

    @Override
    public Flux<DataBuffer> read(String id, long position) {
        return this.repository.findById((Object)id).switchIfEmpty(Mono.error(NotFoundException::new)).flatMapMany(file -> this.readFile((FileEntity)((Object)file), position));
    }

    @Override
    public Flux<DataBuffer> read(String id, Function<FileManager.ReaderContext, Mono<Void>> beforeRead) {
        return this.repository.findById((Object)id).switchIfEmpty(Mono.error(NotFoundException::new)).flatMapMany(file -> {
            DefaultReaderContext context = new DefaultReaderContext(file.toInfo(), 0L);
            return ((Mono)beforeRead.apply(context)).thenMany((Publisher)Flux.defer(() -> this.readFile((FileEntity)((Object)file), context.position)));
        });
    }

    public class ServiceImpl
    implements Service {
        @Override
        public Flux<ByteBuf> read(ReadRequest request) {
            return ClusterFileManager.this.read(request.id, request.position).map(buf -> {
                if (buf instanceof NettyDataBuffer) {
                    return ((NettyDataBuffer)buf).getNativeBuffer();
                }
                return Unpooled.wrappedBuffer((ByteBuffer)buf.asByteBuffer());
            });
        }
    }

    @io.scalecube.services.annotations.Service
    public static interface Service {
        @ServiceMethod
        public Flux<ByteBuf> read(ReadRequest var1);
    }

    public static class ReadRequest {
        private String id;
        private long position;

        public String getId() {
            return this.id;
        }

        public long getPosition() {
            return this.position;
        }

        public void setId(String id) {
            this.id = id;
        }

        public void setPosition(long position) {
            this.position = position;
        }

        public ReadRequest(String id, long position) {
            this.id = id;
            this.position = position;
        }

        public ReadRequest() {
        }
    }

    private static class DefaultReaderContext
    implements FileManager.ReaderContext {
        private final FileInfo info;
        private long position;

        @Override
        public FileInfo info() {
            return this.info;
        }

        @Override
        public void position(long position) {
            this.position = position;
        }

        public DefaultReaderContext(FileInfo info, long position) {
            this.info = info;
            this.position = position;
        }
    }
}

