package io.dapr.client;

import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.exceptions.DaprException;
import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/dapr/client/Subscription.class */
public class Subscription<T> implements Closeable {
    private final BlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1> ackQueue = new LinkedBlockingQueue(50);
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final Semaphore receiverStateChange = new Semaphore(0);
    private Thread acker;
    private Thread receiver;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Subscription(DaprGrpc.DaprStub daprStub, DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1, SubscriptionListener<T> subscriptionListener, Function<DaprProtos.SubscribeTopicEventsResponseAlpha1, CloudEvent<T>> function) {
        AtomicReference atomicReference = new AtomicReference();
        this.acker = new Thread(() -> {
            while (this.running.get()) {
                try {
                    DaprProtos.SubscribeTopicEventsRequestAlpha1 take = this.ackQueue.take();
                    if (take != null) {
                        StreamObserver streamObserver = (StreamObserver) atomicReference.get();
                        if (streamObserver == null) {
                            Thread.sleep(1000L);
                        } else {
                            streamObserver.onNext(take);
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                } catch (Exception e2) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        });
        this.receiver = new Thread(() -> {
            while (this.running.get()) {
                StreamObserver subscribeTopicEventsAlpha1 = daprStub.subscribeTopicEventsAlpha1(new StreamObserver<DaprProtos.SubscribeTopicEventsResponseAlpha1>() { // from class: io.dapr.client.Subscription.1
                    public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 subscribeTopicEventsResponseAlpha1) {
                        String id;
                        try {
                            if (((StreamObserver) atomicReference.get()) == null) {
                                throw new RuntimeException("Cannot receive event: streaming subscription is not initialized.");
                            }
                            CloudEvent cloudEvent = (CloudEvent) function.apply(subscribeTopicEventsResponseAlpha1);
                            if (cloudEvent == null || (id = cloudEvent.getId()) == null || id.isEmpty()) {
                                return;
                            }
                            Subscription.onEvent(subscriptionListener, cloudEvent).subscribe(status -> {
                                try {
                                    Subscription.this.ackQueue.put(Subscription.buildAckRequest(id, status));
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            });
                        } catch (Exception e) {
                            onError(DaprException.propagate(e));
                        }
                    }

                    public void onError(Throwable th) {
                        subscriptionListener.onError(DaprException.propagate(th));
                    }

                    public void onCompleted() {
                        Subscription.this.receiverStateChange.release();
                    }
                });
                atomicReference.set(subscribeTopicEventsAlpha1);
                subscribeTopicEventsAlpha1.onNext(subscribeTopicEventsRequestAlpha1);
                try {
                    this.receiverStateChange.acquire();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.running.set(false);
                }
            }
        });
    }

    private static <T> Mono<SubscriptionListener.Status> onEvent(SubscriptionListener<T> subscriptionListener, CloudEvent<T> cloudEvent) {
        return subscriptionListener.onEvent(cloudEvent).onErrorMap(th -> {
            RuntimeException propagate = DaprException.propagate(th);
            subscriptionListener.onError(propagate);
            return propagate;
        }).onErrorReturn(SubscriptionListener.Status.RETRY);
    }

    @NotNull
    private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(String str, SubscriptionListener.Status status) {
        return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder().setEventProcessed(DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder().setId(str).setStatus(DaprAppCallbackProtos.TopicEventResponse.newBuilder().setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.valueOf(status.name())).build()).build()).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.receiver.start();
        this.acker.start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.running.set(false);
        this.receiverStateChange.release();
        this.acker.interrupt();
    }

    public void awaitTermination() throws InterruptedException {
        this.receiver.join();
        this.acker.join();
    }
}
