/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.event.dea.impl;

import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.api.resource.observation.ExternalResourceChangeListener;
import org.apache.sling.api.resource.observation.ResourceChange;
import org.apache.sling.api.resource.observation.ResourceChangeListener;
import org.apache.sling.event.dea.impl.ResourceHelper;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedEventSender
implements ResourceChangeListener,
ExternalResourceChangeListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private volatile boolean running;
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
    private final ResourceResolverFactory resourceResolverFactory;
    private final EventAdmin eventAdmin;
    private final String ownRootPathWithSlash;
    private volatile ServiceRegistration<ResourceChangeListener> serviceRegistration;
    private AtomicInteger postedEventCounter = new AtomicInteger();

    public DistributedEventSender(final BundleContext bundleContext, final String rootPath, String ownRootPath, ResourceResolverFactory rrFactory, EventAdmin eventAdmin) {
        this.eventAdmin = eventAdmin;
        this.resourceResolverFactory = rrFactory;
        this.ownRootPathWithSlash = ownRootPath + "/";
        this.running = true;
        Thread backgroundThread = new Thread(new Runnable(){

            @Override
            public void run() {
                Hashtable<String, String> props = new Hashtable<String, String>();
                ((Dictionary)props).put("service.vendor", "The Apache Software Foundation");
                ((Dictionary)props).put("resource.change.types", ResourceChange.ChangeType.ADDED.name());
                ((Dictionary)props).put("resource.paths", rootPath);
                ServiceRegistration reg = bundleContext.registerService(ResourceChangeListener.class, (Object)DistributedEventSender.this, props);
                DistributedEventSender.this.serviceRegistration = reg;
                try {
                    DistributedEventSender.this.runInBackground();
                }
                catch (Throwable t) {
                    DistributedEventSender.this.logger.error("Background thread stopped with exception: " + t.getMessage(), t);
                    DistributedEventSender.this.running = false;
                }
            }
        });
        backgroundThread.start();
    }

    public void stop() {
        if (this.serviceRegistration != null) {
            this.serviceRegistration.unregister();
            this.serviceRegistration = null;
        }
        this.running = false;
        try {
            this.queue.put("");
        }
        catch (InterruptedException e) {
            this.ignoreException(e);
            Thread.currentThread().interrupt();
        }
    }

    private Event readEvent(Resource eventResource) {
        try {
            ValueMap vm = ResourceHelper.getValueMap(eventResource);
            String topic = (String)vm.get("event.topics", String.class);
            if (topic == null) {
                this.logger.error("Unable to read distributed event from " + eventResource.getPath() + " : no topic property available.");
            } else {
                Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
                List readErrorList = (List)properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
                if (readErrorList == null) {
                    properties.remove("event.topics");
                    properties.remove("event.distribute");
                    Object oldRT = properties.remove("event.dea.sling:resourceType");
                    if (oldRT != null) {
                        properties.put("sling:resourceType", oldRT);
                    } else {
                        properties.remove("sling:resourceType");
                    }
                    try {
                        Event event = new Event(topic, properties);
                        return event;
                    }
                    catch (IllegalArgumentException iae) {
                        this.logger.error("Unable to read event: " + iae.getMessage(), (Throwable)iae);
                    }
                } else {
                    for (Exception e : readErrorList) {
                        this.logger.warn("Unable to read distributed event from " + eventResource.getPath(), (Throwable)e);
                    }
                }
            }
        }
        catch (InstantiationException ie) {
            this.ignoreException(ie);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runInBackground() {
        while (this.running) {
            String path = null;
            try {
                path = this.queue.take();
            }
            catch (InterruptedException e) {
                this.ignoreException(e);
                Thread.currentThread().interrupt();
                this.running = false;
            }
            if (path == null || path.length() <= 0 || !this.running) continue;
            ResourceResolver resolver = null;
            try {
                Event e;
                resolver = this.resourceResolverFactory.getServiceResourceResolver(null);
                Resource eventResource = resolver.getResource(path);
                if (eventResource == null) {
                    this.logger.warn("runInBackground : resource not found at " + path);
                    continue;
                }
                if (!"sling/distributed/event".equals(eventResource.getResourceType()) || (e = this.readEvent(eventResource)) == null) continue;
                EventAdmin localEA = this.eventAdmin;
                if (localEA != null) {
                    localEA.postEvent(e);
                    this.postedEventCounter.incrementAndGet();
                    continue;
                }
                this.logger.error("Unable to post event as no event admin is available.");
            }
            catch (LoginException ex) {
                this.logger.error("Exception during creation of resource resolver.", (Throwable)ex);
            }
            finally {
                if (resolver == null) continue;
                resolver.close();
            }
        }
    }

    public void onChange(List<ResourceChange> changes) {
        for (ResourceChange c : changes) {
            if (c.getPath().startsWith(this.ownRootPathWithSlash)) continue;
            try {
                this.queue.put(c.getPath());
            }
            catch (InterruptedException ex) {
                this.ignoreException(ex);
                Thread.currentThread().interrupt();
            }
        }
    }

    private void ignoreException(Exception e) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Ignored exception " + e.getMessage(), (Throwable)e);
        }
    }

    public int getPostedEventCounter() {
        return this.postedEventCounter.get();
    }
}

