/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChangelogRecordDeserializationHelper {
    public static final Logger log = LoggerFactory.getLogger(ChangelogRecordDeserializationHelper.class);
    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = new byte[]{0};
    public static final String CHANGELOG_VERSION_HEADER_KEY = "v";
    public static final String CHANGELOG_POSITION_HEADER_KEY = "c";
    public static final RecordHeader CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY = new RecordHeader("v", V_0_CHANGELOG_VERSION_HEADER_VALUE);

    public static void applyChecksAndUpdatePosition(ConsumerRecord<byte[], byte[]> record, boolean consistencyEnabled, Position position) {
        if (!consistencyEnabled) {
            return;
        }
        Header versionHeader = record.headers().lastHeader(CHANGELOG_VERSION_HEADER_KEY);
        if (versionHeader == null) {
            return;
        }
        switch (versionHeader.value()[0]) {
            case 0: {
                Header vectorHeader = record.headers().lastHeader(CHANGELOG_POSITION_HEADER_KEY);
                if (vectorHeader == null) {
                    throw new StreamsException("This should not happen. Consistency is enabled but the changelog contains records without consistency information.");
                }
                position.merge(PositionSerde.deserialize(ByteBuffer.wrap(vectorHeader.value())));
                break;
            }
            default: {
                log.warn("Changelog records have been encoded using a larger version than this server understands.Please upgrade your server.");
            }
        }
    }
}

