/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.buffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.BasicDataType;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.MemoryUsage;
import org.jetlinks.community.codec.Serializers;
import org.jetlinks.core.cache.FileQueue;
import org.jetlinks.core.cache.FileQueueProxy;
import org.jetlinks.core.utils.SerializeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

public class PersistenceBuffer<T extends Serializable>
implements Disposable {
    private static final AtomicIntegerFieldUpdater<PersistenceBuffer> WIP = AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "wip");
    private static final AtomicIntegerFieldUpdater<PersistenceBuffer> REMAINDER = AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "remainder");
    private static final AtomicIntegerFieldUpdater<PersistenceBuffer> DEAD_SZIE = AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "deadSize");
    private static final AtomicReferenceFieldUpdater<PersistenceBuffer, Collection> BUFFER = AtomicReferenceFieldUpdater.newUpdater(PersistenceBuffer.class, Collection.class, "buffer");
    private static final AtomicReferenceFieldUpdater<PersistenceBuffer, Boolean> DISPOSED = AtomicReferenceFieldUpdater.newUpdater(PersistenceBuffer.class, Boolean.class, "disposed");
    private Logger logger = LoggerFactory.getLogger(PersistenceBuffer.class);
    private String name = "unknown";
    private FileQueue<Buf<T>> queue;
    private FileQueue<Buf<T>> deadQueue;
    private final Function<Flux<T>, Mono<Boolean>> handler;
    private BufferSettings settings;
    private volatile Collection<Buf<T>> buffer;
    private final Supplier<Externalizable> instanceBuilder;
    private long lastFlushTime;
    private volatile int wip;
    private volatile int remainder;
    private volatile int deadSize;
    private Disposable intervalFlush;
    private volatile Boolean disposed = false;

    public PersistenceBuffer(String filePath, String fileName, Supplier<T> newInstance, Function<Flux<T>, Mono<Boolean>> handler) {
        this(BufferSettings.create(filePath, fileName), newInstance, handler);
    }

    public PersistenceBuffer(String filePath, String fileName, Function<Flux<T>, Mono<Boolean>> handler) {
        this(filePath, fileName, null, handler);
    }

    public PersistenceBuffer(BufferSettings settings, Supplier<T> newInstance, Function<Flux<T>, Mono<Boolean>> handler) {
        Serializable data;
        this.instanceBuilder = newInstance != null ? ((data = (Serializable)newInstance.get()) instanceof Externalizable ? () -> (Externalizable)newInstance.get() : null) : null;
        this.settings = settings;
        this.handler = list -> Mono.defer(() -> (Mono)handler.apply((Flux)list));
    }

    public PersistenceBuffer<T> bufferSize(int size) {
        this.settings = this.settings.bufferSize(size);
        return this;
    }

    public PersistenceBuffer<T> bufferTimeout(Duration bufferTimeout) {
        this.settings = this.settings.bufferTimeout(bufferTimeout);
        return this;
    }

    public PersistenceBuffer<T> parallelism(int parallelism) {
        this.settings = this.settings.parallelism(parallelism);
        return this;
    }

    public PersistenceBuffer<T> maxRetry(int maxRetryTimes) {
        this.settings = this.settings.maxRetry(maxRetryTimes);
        return this;
    }

    public PersistenceBuffer<T> retryWhenError(Predicate<Throwable> predicate) {
        this.settings = this.settings.retryWhenError(predicate);
        return this;
    }

    public PersistenceBuffer<T> settings(Function<BufferSettings, BufferSettings> mapper) {
        this.settings = mapper.apply(this.settings);
        return this;
    }

    public PersistenceBuffer<T> name(String name) {
        this.name = name;
        this.logger = LoggerFactory.getLogger((String)(PersistenceBuffer.class.getName() + "." + name));
        return this;
    }

    static <T> FileQueue<Buf<T>> wrap(FileQueue<Buf<T>> queue) {
        return new FileQueueProxy<Buf<T>>(queue){

            public void clear() {
                super.flush();
            }
        };
    }

    private void init() {
        String filePath = this.settings.getFilePath();
        String fileName = this.settings.getFileName();
        Path path = Paths.get(filePath, new String[0]);
        fileName = fileName.replaceAll("[\\s\\\\/:*?\"<>|]", "_");
        BufDataType dataType = new BufDataType();
        this.queue = PersistenceBuffer.wrap(FileQueue.builder().name(fileName).path(path).option("valueType", (Object)dataType).build());
        this.remainder = this.queue.size();
        this.deadQueue = PersistenceBuffer.wrap(FileQueue.builder().name(fileName + ".dead").path(path).option("valueType", (Object)dataType).build());
        this.deadSize = this.deadQueue.size();
        this.buffer = this.newBuffer();
    }

    public void start() {
        if (this.intervalFlush != null) {
            return;
        }
        this.init();
        this.drain();
        if (!this.settings.getBufferTimeout().isZero()) {
            this.intervalFlush = Flux.interval((Duration)this.settings.getBufferTimeout()).doOnNext(ignore -> this.intervalFlush()).subscribe();
        }
    }

    private void dead(Collection<Buf<T>> buf) {
        if (this.deadQueue.addAll(buf)) {
            DEAD_SZIE.addAndGet(this, buf.size());
        }
    }

    private void dead(Buf<T> buf) {
        if (this.deadQueue.add(buf)) {
            DEAD_SZIE.incrementAndGet(this);
        }
    }

    private void requeue(Collection<Buf<T>> buffer) {
        for (Buf<T> buf : buffer) {
            if ((long)(++((Buf)buf).retry) >= this.settings.getMaxRetryTimes()) {
                this.dead(buf);
                continue;
            }
            if (!this.queue.offer(buf)) continue;
            REMAINDER.incrementAndGet(this);
        }
    }

    private void write(Buf<T> data) {
        REMAINDER.incrementAndGet(this);
        this.queue.offer(data);
        this.drain();
    }

    public void write(T data) {
        this.write((T)new Buf<T>(data, this.instanceBuilder));
    }

    public void dispose() {
        if (DISPOSED.compareAndSet(this, false, true)) {
            if (this.intervalFlush != null) {
                this.intervalFlush.dispose();
            }
            this.queue.addAll(BUFFER.getAndSet(this, this.newBuffer()));
            this.queue.close();
            this.deadQueue.close();
        }
    }

    public boolean isDisposed() {
        return DISPOSED.get(this);
    }

    public int size() {
        return this.remainder;
    }

    private void intervalFlush() {
        if (System.currentTimeMillis() - this.lastFlushTime >= this.settings.getBufferTimeout().toMillis() && WIP.get(this) <= this.settings.getParallelism()) {
            this.flush();
        }
    }

    private void flush(final Collection<Buf<T>> c) {
        try {
            this.lastFlushTime = System.currentTimeMillis();
            if (c.isEmpty()) {
                this.drain();
                return;
            }
            WIP.incrementAndGet(this);
            this.handler.apply(Flux.fromIterable(c).mapNotNull(buf -> (Serializable)((Buf)buf).data)).subscribe((CoreSubscriber)new BaseSubscriber<Boolean>(){
                final long startWith = System.currentTimeMillis();
                final int remainder = PersistenceBuffer.access$100().get(PersistenceBuffer.this);

                protected void hookOnNext(@Nonnull Boolean doRequeue) {
                    if (PersistenceBuffer.this.logger.isDebugEnabled()) {
                        PersistenceBuffer.this.logger.debug("write {} data,size:{},remainder:{},requeue: {}.take up time: {} ms", new Object[]{PersistenceBuffer.this.name, c.size(), this.remainder, doRequeue, System.currentTimeMillis() - this.startWith});
                    }
                    if (doRequeue.booleanValue()) {
                        PersistenceBuffer.this.requeue(c);
                    }
                }

                protected void hookOnError(@Nonnull Throwable err) {
                    if (PersistenceBuffer.this.settings.getRetryWhenError().test(err)) {
                        if (PersistenceBuffer.this.logger.isWarnEnabled()) {
                            PersistenceBuffer.this.logger.warn("write {} data failed do retry later,size:{},remainder:{}.use time: {} ms", new Object[]{PersistenceBuffer.this.name, c.size(), this.remainder, System.currentTimeMillis() - this.startWith});
                        }
                        PersistenceBuffer.this.requeue(c);
                    } else {
                        if (PersistenceBuffer.this.logger.isWarnEnabled()) {
                            PersistenceBuffer.this.logger.warn("write {} data error,size:{},remainder:{}.use time: {} ms", new Object[]{PersistenceBuffer.this.name, c.size(), this.remainder, System.currentTimeMillis() - this.startWith, err});
                        }
                        PersistenceBuffer.this.dead(c);
                    }
                }

                protected void hookFinally(@Nonnull SignalType type) {
                    WIP.decrementAndGet(PersistenceBuffer.this);
                    PersistenceBuffer.this.drain();
                }
            });
        }
        catch (Throwable e) {
            this.logger.warn("flush buffer error", e);
        }
    }

    private void flush() {
        Collection<Buf<T>> c = BUFFER.getAndSet(this, this.newBuffer());
        this.flush(c);
    }

    private Collection<Buf<T>> newBuffer() {
        return new ArrayList<Buf<T>>(this.settings.getBufferSize());
    }

    private void drain() {
        if (WIP.incrementAndGet(this) <= this.settings.getParallelism()) {
            Buf poll;
            int size = this.settings.getBufferSize();
            for (int i = 0; i < size && !this.isDisposed() && (poll = (Buf)this.queue.poll()) != null; ++i) {
                this.onNext(poll);
            }
        }
        WIP.decrementAndGet(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onNext(@Nonnull Buf<T> value) {
        Collection<Buf<T>> c;
        REMAINDER.decrementAndGet(this);
        boolean flush = false;
        PersistenceBuffer persistenceBuffer = this;
        synchronized (persistenceBuffer) {
            c = this.buffer();
            if (c.size() == this.settings.getBufferSize() - 1) {
                BUFFER.compareAndSet(this, c, this.newBuffer());
                flush = true;
            }
            c.add(value);
        }
        if (flush) {
            this.flush(c);
        }
    }

    private Collection<Buf<T>> buffer() {
        return BUFFER.get(this);
    }

    protected ObjectInput createInput(ByteBuf buffer) {
        return Serializers.getDefault().createInput((InputStream)new ByteBufInputStream(buffer, true));
    }

    protected ObjectOutput createOutput(ByteBuf buffer) {
        return Serializers.getDefault().createOutput((OutputStream)new ByteBufOutputStream(buffer));
    }

    public String getName() {
        return this.name;
    }

    static /* synthetic */ AtomicIntegerFieldUpdater access$100() {
        return REMAINDER;
    }

    class BufDataType
    extends BasicDataType<Buf<T>> {
        BufDataType() {
        }

        public int compare(Buf<T> a, Buf<T> b) {
            return 0;
        }

        public int getMemory(Buf<T> obj) {
            if (obj.data instanceof MemoryUsage) {
                return ((MemoryUsage)obj.data).usage();
            }
            if (obj.data instanceof String) {
                return ((String)obj.data).length() * 2;
            }
            return 10000;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void write(WriteBuffer buff, Buf<T> data) {
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
            try (ObjectOutput output = PersistenceBuffer.this.createOutput(buffer);){
                data.writeExternal(output);
                output.flush();
                buff.put(buffer.nioBuffer());
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)buffer);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void write(WriteBuffer buff, Object obj, int len) {
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
            try (ObjectOutput output = PersistenceBuffer.this.createOutput(buffer);){
                for (int i = 0; i < len; ++i) {
                    Buf buf = (Buf)Array.get(obj, i);
                    buf.writeExternal(output);
                }
                output.flush();
                buff.put(buffer.nioBuffer());
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)buffer);
            }
        }

        public void read(ByteBuffer buff, Object obj, int len) {
            try (ObjectInput input = PersistenceBuffer.this.createInput(Unpooled.wrappedBuffer((ByteBuffer)buff));){
                for (int i = 0; i < len; ++i) {
                    Buf data = new Buf(PersistenceBuffer.this.instanceBuilder);
                    data.readExternal(input);
                    Array.set(obj, i, data);
                }
            }
        }

        public Buf<T> read(ByteBuffer buff) {
            Buf data = new Buf(PersistenceBuffer.this.instanceBuilder);
            try (ObjectInput input = PersistenceBuffer.this.createInput(Unpooled.wrappedBuffer((ByteBuffer)buff));){
                data.readExternal(input);
            }
            return data;
        }

        public Buf<T>[] createStorage(int size) {
            return new Buf[size];
        }
    }

    public static class Buf<T>
    implements Externalizable {
        private final Supplier<Externalizable> instanceBuilder;
        private T data;
        private int retry = 0;

        public Buf() {
            throw new IllegalAccessException();
        }

        public Buf(Supplier<Externalizable> instanceBuilder) {
            this.instanceBuilder = instanceBuilder;
        }

        public Buf(T data, Supplier<Externalizable> instanceBuilder) {
            this.data = data;
            this.instanceBuilder = instanceBuilder;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeInt(this.retry);
            if (this.instanceBuilder != null) {
                ((Externalizable)this.data).writeExternal(out);
            } else {
                SerializeUtils.writeObject(this.data, (ObjectOutput)out);
            }
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.retry = in.readInt();
            if (this.instanceBuilder != null) {
                Externalizable data = this.instanceBuilder.get();
                data.readExternal(in);
                this.data = data;
            } else {
                this.data = SerializeUtils.readObject((ObjectInput)in);
            }
        }

        public Buf(Supplier<Externalizable> instanceBuilder, T data, int retry) {
            this.instanceBuilder = instanceBuilder;
            this.data = data;
            this.retry = retry;
        }
    }
}

