/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Objects;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;

public class SubscriptionJoinProcessorSupplier<KLeft, KRight, VRight>
implements ProcessorSupplier<CombinedKey<KRight, KLeft>, Change<ValueAndTimestamp<SubscriptionWrapper<KLeft>>>, KLeft, SubscriptionResponseWrapper<VRight>> {
    private final KTableValueGetterSupplier<KRight, VRight> foreignValueGetterSupplier;

    public SubscriptionJoinProcessorSupplier(KTableValueGetterSupplier<KRight, VRight> foreignValueGetterSupplier) {
        this.foreignValueGetterSupplier = foreignValueGetterSupplier;
    }

    @Override
    public Processor<CombinedKey<KRight, KLeft>, Change<ValueAndTimestamp<SubscriptionWrapper<KLeft>>>, KLeft, SubscriptionResponseWrapper<VRight>> get() {
        return new ContextualProcessor<CombinedKey<KRight, KLeft>, Change<ValueAndTimestamp<SubscriptionWrapper<KLeft>>>, KLeft, SubscriptionResponseWrapper<VRight>>(){
            private KTableValueGetter<KRight, VRight> foreignValues;

            @Override
            public void init(ProcessorContext<KLeft, SubscriptionResponseWrapper<VRight>> context) {
                super.init(context);
                this.foreignValues = SubscriptionJoinProcessorSupplier.this.foreignValueGetterSupplier.get();
                this.foreignValues.init(context);
            }

            @Override
            public void process(Record<CombinedKey<KRight, KLeft>, Change<ValueAndTimestamp<SubscriptionWrapper<KLeft>>>> record) {
                Objects.requireNonNull(record.key(), "This processor should never see a null key.");
                Objects.requireNonNull(record.value(), "This processor should never see a null value.");
                ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp)record.value().newValue;
                Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
                SubscriptionWrapper value = this.subscriptionWrapper(valueAndTimestamp);
                ValueAndTimestamp foreignValueAndTime = record.key().foreignKey() == null ? null : this.foreignValues.get(record.key().foreignKey());
                long resultTimestamp = foreignValueAndTime == null ? valueAndTimestamp.timestamp() : Math.max(valueAndTimestamp.timestamp(), foreignValueAndTime.timestamp());
                switch (value.instruction()) {
                    case DELETE_KEY_AND_PROPAGATE: {
                        this.context().forward(record.withKey(record.key().primaryKey()).withValue(new SubscriptionResponseWrapper<Object>(value.hash(), null, value.primaryPartition())).withTimestamp(resultTimestamp));
                        break;
                    }
                    case PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE: {
                        Object valueToSend = foreignValueAndTime == null ? null : (Object)foreignValueAndTime.value();
                        this.context().forward(record.withKey(record.key().primaryKey()).withValue(new SubscriptionResponseWrapper<Object>(value.hash(), valueToSend, value.primaryPartition())).withTimestamp(resultTimestamp));
                        break;
                    }
                    case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE: {
                        if (foreignValueAndTime == null) break;
                        this.context().forward(record.withKey(record.key().primaryKey()).withValue(new SubscriptionResponseWrapper(value.hash(), foreignValueAndTime.value(), value.primaryPartition())).withTimestamp(resultTimestamp));
                        break;
                    }
                    case DELETE_KEY_NO_PROPAGATE: {
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unhandled instruction: " + String.valueOf((Object)value.instruction()));
                    }
                }
            }

            private SubscriptionWrapper<KLeft> subscriptionWrapper(ValueAndTimestamp<SubscriptionWrapper<KLeft>> valueAndTimestamp) {
                SubscriptionWrapper value = valueAndTimestamp.value();
                if (value.version() > 1) {
                    throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
                }
                return value;
            }
        };
    }
}

