/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.PubSubMessageListener;
import org.redisson.PubSubStatusListener;
import org.redisson.RedissonTopic;
import org.redisson.api.RFuture;
import org.redisson.api.RTopic;
import org.redisson.api.RTopicReactive;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;

public class RedissonTopicReactive<M>
implements RTopicReactive<M> {
    private final RTopic<M> topic;
    private final CommandReactiveExecutor commandExecutor;
    private final String name;

    public RedissonTopicReactive(CommandReactiveExecutor commandExecutor, String name) {
        this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
    }

    public RedissonTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
        this.topic = new RedissonTopic(codec, commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.name = name;
    }

    @Override
    public List<String> getChannelNames() {
        return Collections.singletonList(this.name);
    }

    @Override
    public Publisher<Long> publish(final M message) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonTopicReactive.this.topic.publishAsync(message);
            }
        });
    }

    @Override
    public Publisher<Integer> addListener(StatusListener listener) {
        return this.addListener(new PubSubStatusListener(listener, this.name));
    }

    @Override
    public Publisher<Integer> addListener(MessageListener<M> listener) {
        PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, this.name);
        return this.addListener((RedisPubSubListener<?>)pubSubListener);
    }

    @Override
    private Publisher<Integer> addListener(final RedisPubSubListener<?> pubSubListener) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Integer>>(){

            @Override
            public RFuture<Integer> get() {
                return ((RedissonTopic)RedissonTopicReactive.this.topic).addListenerAsync(pubSubListener);
            }
        });
    }

    @Override
    public void removeListener(int listenerId) {
        this.topic.removeListener(listenerId);
    }
}

