/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.notify.manager.service;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.notify.manager.entity.Notification;
import org.jetlinks.community.notify.manager.entity.NotificationEntity;
import org.jetlinks.community.notify.manager.enums.NotificationState;
import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.dao.NonTransientDataAccessException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.CannotCreateTransactionException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
@ConfigurationProperties(prefix="jetlinks.notification")
public class NotificationService
extends GenericReactiveCrudService<NotificationEntity, String> {
    private static final Logger log = LoggerFactory.getLogger(NotificationService.class);
    private BufferProperties buffer = new BufferProperties();
    private PersistenceBuffer<NotificationEntity> writer;

    public NotificationService() {
        this.buffer.setFilePath("./data/notification-buffer");
        this.buffer.setSize(1000);
        this.buffer.setTimeout(Duration.ofSeconds(1L));
    }

    @PostConstruct
    public void init() {
        this.writer = new PersistenceBuffer(BufferSettings.create((BufferProperties)this.buffer), NotificationEntity::new, flux -> this.save((Publisher)flux).then(Reactors.ALWAYS_FALSE)).retryWhenError(err -> ErrorUtils.hasException((Throwable)err, (Class[])new Class[]{CannotCreateTransactionException.class, NonTransientDataAccessException.class, TimeoutException.class, IOException.class})).name("notification");
        this.writer.start();
    }

    @PreDestroy
    public void dispose() {
        this.writer.dispose();
    }

    @Subscribe(value={"/notifications/**"})
    public Mono<Void> subscribeNotifications(Notification notification) {
        this.writer.write((Serializable)((Object)NotificationEntity.from(notification)));
        return Mono.empty();
    }

    public Flux<NotificationEntity> findAndMarkRead(QueryParamEntity query) {
        return this.query(query).collectList().filter(CollectionUtils::isNotEmpty).flatMapMany(list -> ((ReactiveUpdate)((ReactiveUpdate)((ReactiveUpdate)((ReactiveUpdate)this.createUpdate().set(NotificationEntity::getState, (Object)NotificationState.read)).where()).in(GenericEntity::getId, (Collection)list.stream().map(GenericEntity::getId).collect(Collectors.toList()))).and(NotificationEntity::getState, (Object)NotificationState.unread)).execute().thenMany((Publisher)Flux.fromIterable((Iterable)list)).doOnNext(e -> e.setState(NotificationState.read)));
    }

    public BufferProperties getBuffer() {
        return this.buffer;
    }

    public void setBuffer(BufferProperties buffer) {
        this.buffer = buffer;
    }
}

