/*
 * Decompiled with CFR 0.152.
 */
package com.tplink.smb.eventcenter.domain;

import com.tplink.smb.eventcenter.api.event.DomainEvent;
import com.tplink.smb.eventcenter.api.event.DomainEventBus;
import com.tplink.smb.eventcenter.api.event.DomainEventExecutor;
import com.tplink.smb.eventcenter.api.event.DomainEventSubscriber;
import com.tplink.smb.eventcenter.core.util.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class DefaultDomainEventBus
implements DomainEventBus {
    private static final Logger log = LoggerFactory.getLogger(DefaultDomainEventBus.class);
    private static final ConcurrentHashMap<Class, AtomicReference<Set<DomainEventExecutor>>> EVENT_SUBSCRIBERS_ASYNC_GROUP = new ConcurrentHashMap(32);
    private static final ConcurrentHashMap<Class, AtomicReference<Set<DomainEventExecutor>>> EVENT_SUBSCRIBERS_SYNC_GROUP = new ConcurrentHashMap(32);
    private static final DefaultDomainEventBus INSTANCE = new DefaultDomainEventBus();

    private DefaultDomainEventBus() {
    }

    public static DefaultDomainEventBus instance() {
        return INSTANCE;
    }

    public void publish(DomainEvent event) {
        this.loopDomainEvent(event, this::callSubscribersAsync);
    }

    public void publishSync(DomainEvent event) {
        this.loopDomainEvent(event, this::callSubscribersSync);
    }

    public <D> void register(Class<D> dClass, DomainEventSubscriber<D> subscriber, ExecutorService executor) {
        AtomicReference<Set<DomainEventExecutor>> atomicReference = EVENT_SUBSCRIBERS_ASYNC_GROUP.getOrDefault(dClass, new AtomicReference());
        EVENT_SUBSCRIBERS_ASYNC_GROUP.put(dClass, this.doRegister(subscriber, atomicReference, executor));
    }

    public <D> void register(Class<D> dClass, DomainEventSubscriber<D> subscriber) {
        AtomicReference atomicReference = EVENT_SUBSCRIBERS_ASYNC_GROUP.computeIfAbsent(dClass, set -> new AtomicReference());
        EVENT_SUBSCRIBERS_ASYNC_GROUP.put(dClass, this.doRegister(subscriber, atomicReference, null));
    }

    public <D> void registerSync(Class<D> dClass, DomainEventSubscriber<D> subscriber) {
        AtomicReference<Set<DomainEventExecutor>> atomicReference = EVENT_SUBSCRIBERS_SYNC_GROUP.getOrDefault(dClass, new AtomicReference());
        EVENT_SUBSCRIBERS_SYNC_GROUP.put(dClass, this.doRegister(subscriber, atomicReference, null));
    }

    public <D> void unRegister(DomainEventSubscriber<D> subscriber) {
        for (Map.Entry<Class, AtomicReference<Set<DomainEventExecutor>>> entry : EVENT_SUBSCRIBERS_ASYNC_GROUP.entrySet()) {
            this.doUnRegister(subscriber, entry);
        }
        for (Map.Entry<Class, AtomicReference<Set<DomainEventExecutor>>> entry : EVENT_SUBSCRIBERS_SYNC_GROUP.entrySet()) {
            this.doUnRegister(subscriber, entry);
        }
    }

    private <D> AtomicReference<Set<DomainEventExecutor>> doRegister(DomainEventSubscriber<D> subscriber, AtomicReference<Set<DomainEventExecutor>> atomicReference, ExecutorService executor) {
        HashSet<Object> newSubscribers;
        Set<DomainEventExecutor> oldSubscribers;
        boolean isUpdate;
        do {
            oldSubscribers = atomicReference.get();
            newSubscribers = CollectionUtils.isEmpty((Collection)atomicReference.get()) ? new HashSet() : new HashSet<DomainEventExecutor>(oldSubscribers);
            newSubscribers.add(new DomainEventExecutor(subscriber, (Executor)executor));
        } while (!(isUpdate = atomicReference.compareAndSet(oldSubscribers, newSubscribers)));
        return atomicReference;
    }

    private <D> void doUnRegister(DomainEventSubscriber<D> subscriber, Map.Entry<Class, AtomicReference<Set<DomainEventExecutor>>> entry) {
        HashSet<Object> newSubscribers;
        Set<DomainEventExecutor> oldSubscribers;
        boolean isUpdate;
        AtomicReference<Set<DomainEventExecutor>> atomicReference = entry.getValue();
        do {
            oldSubscribers = atomicReference.get();
            newSubscribers = CollectionUtils.isEmpty((Collection)atomicReference.get()) ? new HashSet() : new HashSet<DomainEventExecutor>(oldSubscribers);
            newSubscribers.removeIf(domainEventExecutor -> Objects.equals(subscriber, domainEventExecutor.getSubscriber()));
        } while (!(isUpdate = atomicReference.compareAndSet(oldSubscribers, newSubscribers)));
    }

    private void loopDomainEvent(DomainEvent event, BiConsumer<DomainEvent, Class<?>> consumer) {
        Class<?> eventClass = event.getClass();
        while (DomainEvent.class.isAssignableFrom(eventClass)) {
            consumer.accept(event, eventClass);
            eventClass = eventClass.getSuperclass();
        }
        for (Class eventInterface : ClassUtils.getAllInterfaces(event.getClass())) {
            if (!DomainEvent.class.isAssignableFrom(eventInterface)) continue;
            consumer.accept(event, eventInterface);
        }
    }

    private void callSubscribersAsync(DomainEvent event, Class<?> eventClass) {
        AtomicReference<Set<DomainEventExecutor>> atomicReference = EVENT_SUBSCRIBERS_ASYNC_GROUP.get(eventClass);
        if (Objects.isNull(atomicReference) || CollectionUtils.isEmpty((Collection)atomicReference.get())) {
            return;
        }
        log.debug("Calling subscribers for event [{}]", eventClass);
        for (DomainEventExecutor subscriber : atomicReference.get()) {
            if (!subscriber.getSubscriber().accept((Object)event)) continue;
            try {
                if (subscriber.getExecutor() != null) {
                    subscriber.getExecutor().execute(() -> this.tryHandle(event, subscriber.getSubscriber()));
                    continue;
                }
                ExecutorServiceHolder.getHandlerPool().execute(() -> this.tryHandle(event, subscriber.getSubscriber()));
            }
            catch (Exception e) {
                log.error("callSubscribersAsync fail", (Throwable)e);
            }
        }
    }

    private void callSubscribersSync(DomainEvent event, Class<?> eventClass) {
        AtomicReference<Set<DomainEventExecutor>> atomicReference = EVENT_SUBSCRIBERS_SYNC_GROUP.get(eventClass);
        if (Objects.isNull(atomicReference) || CollectionUtils.isEmpty((Collection)atomicReference.get())) {
            return;
        }
        log.debug("Calling subscribers for event [{}]", eventClass);
        for (DomainEventExecutor subscriber : atomicReference.get()) {
            if (!subscriber.getSubscriber().accept((Object)event)) continue;
            this.tryHandle(event, subscriber.getSubscriber());
        }
    }

    private void tryHandle(DomainEvent event, DomainEventSubscriber subscriber) {
        try {
            subscriber.handleEvent((Object)event);
        }
        catch (Exception e) {
            log.error("Event handle error. subscriber:{}", (Object)subscriber, (Object)e);
            try {
                subscriber.onEventFailed((Object)event);
            }
            catch (Exception e1) {
                log.error("Event failed callback failed. subscriber:{}", (Object)subscriber, (Object)e1);
                return;
            }
        }
        try {
            subscriber.onEventSuccess((Object)event);
        }
        catch (Exception e) {
            log.warn("Event success callback failed. subscriber:{}", (Object)subscriber, (Object)e);
        }
    }

    private static class ExecutorServiceHolder {
        private static final ExecutorService HANDLER_POOL = new ThreadPoolExecutor(16, 300, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100000), new ThreadFactoryBuilder().setNameFormat("eventbus-handler-pool-%s").build());

        private ExecutorServiceHolder() {
        }

        private static ExecutorService getHandlerPool() {
            return HANDLER_POOL;
        }
    }
}

