/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.cache;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.cache.ReactiveCacheContainer;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.ContextView;

class DefaultReactiveCacheContainer<K, V>
implements ReactiveCacheContainer<K, V> {
    private final Map<K, Container<K, V>> cache = new ConcurrentHashMap<K, Container<K, V>>();

    DefaultReactiveCacheContainer() {
    }

    @Override
    public Mono<V> compute(K key, BiFunction<K, V, Mono<V>> compute) {
        return this.cache.compute(key, (? super K k, ? super V old) -> {
            if (old == null) {
                Mono loader = (Mono)compute.apply(k, null);
                return new Container<Object, Mono>(k, (DefaultReactiveCacheContainer<Object, Mono>)this, loader);
            }
            old.update((Mono)compute.apply(k, old.loaded));
            return old;
        }).ref();
    }

    @Override
    public Mono<V> computeIfAbsent(K key, Function<K, Mono<V>> compute) {
        return this.cache.computeIfAbsent(key, (? super K k) -> {
            Mono loader = (Mono)compute.apply(k);
            return new Container<Object, Mono>(k, (DefaultReactiveCacheContainer<Object, Mono>)this, loader);
        }).ref();
    }

    @Override
    public Mono<V> get(K key, Mono<V> defaultValue) {
        Container<K, V> container = this.cache.get(key);
        if (container != null) {
            return container.ref();
        }
        return defaultValue;
    }

    @Override
    public V put(K key, V value) {
        Container<K, V> container = this.cache.put(key, new Container<K, V>(key, this, value));
        if (container != null) {
            container.dispose();
            return (V)container.loaded;
        }
        return null;
    }

    @Override
    public boolean containsKey(K key) {
        return this.cache.containsKey(key);
    }

    @Override
    public V getNow(K key) {
        Container<K, V> container = this.cache.get(key);
        if (container != null) {
            return (V)container.loaded;
        }
        return null;
    }

    @Override
    public V remove(K key) {
        Container<K, V> container = this.cache.remove(key);
        if (null != container) {
            container.dispose();
        }
        return container == null ? null : (V)container.loaded;
    }

    @Override
    public Flux<V> values() {
        return Flux.fromIterable(this.cache.values()).flatMap(Container::ref);
    }

    @Override
    public List<V> valuesNow() {
        return this.cache.values().stream().filter(c -> c.loaded != null).map(c -> c.loaded).collect(Collectors.toList());
    }

    @Override
    public void clear() {
        HashMap<K, Container<K, V>> cache = new HashMap<K, Container<K, V>>(this.cache);
        this.cache.clear();
        for (Container value : cache.values()) {
            value.dispose();
        }
    }

    @Override
    public void dispose() {
        this.cache.values().forEach(Container::dispose);
        this.cache.clear();
    }

    static class Container<K, T>
    implements Disposable {
        private static final AtomicReferenceFieldUpdater<Container, Mono> LOADER = AtomicReferenceFieldUpdater.newUpdater(Container.class, Mono.class, "loader");
        private final DefaultReactiveCacheContainer<K, T> main;
        private final K key;
        private Sinks.One<T> await;
        public volatile T loaded;
        protected volatile Mono<T> loader;
        private volatile Disposable disposable;

        public Container(K key, DefaultReactiveCacheContainer<K, T> main, Mono<T> loader) {
            this.key = key;
            this.main = main;
            this.loader = loader;
            this.update(loader);
        }

        public Container(K key, DefaultReactiveCacheContainer<K, T> main, T loaded) {
            this.key = key;
            this.main = main;
            this.loaded = loaded;
            this.loader = Mono.just(loaded);
            this.update(this.loader);
        }

        public void update(Mono<T> ref) {
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            Sinks.One<T> old = this.await;
            this.await = Sinks.one();
            if (old != null && old.currentSubscriberCount() > 0) {
                old.tryEmitEmpty();
            }
            this.loader = ref.switchIfEmpty(Mono.fromRunnable(this::loadEmpty)).doOnError(this::loadError).doOnNext(this::afterLoaded);
        }

        private void afterLoaded(T data) {
            if (data != this.loaded && this.loaded instanceof Disposable) {
                ((Disposable)this.loaded).dispose();
            }
            this.loaded = data;
            this.await.tryEmitValue(data);
        }

        @Override
        public void dispose() {
            T loaded;
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            if ((loaded = this.loaded) instanceof Disposable) {
                ((Disposable)loaded).dispose();
            }
        }

        private void loadError(Throwable err) {
            this.await.tryEmitError(err);
            this.main.remove(this.key);
        }

        private void loadEmpty() {
            this.await.tryEmitEmpty();
        }

        private void tryLoad(ContextView contextView) {
            Mono loader = LOADER.getAndSet(this, null);
            if (loader != null) {
                this.disposable = loader.contextWrite(contextView).subscribe();
            }
        }

        public Mono<T> ref() {
            return Mono.deferContextual(ctx -> {
                this.tryLoad((ContextView)ctx);
                return this.await.asMono();
            });
        }
    }
}

