package org.apache.spark.sql.execution.python;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.python.PythonFunction;
import org.apache.spark.api.python.PythonWorker;
import org.apache.spark.api.python.PythonWorkerFactory;
import org.apache.spark.api.python.PythonWorkerUtils$;
import org.apache.spark.api.python.SpecialLengths$;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKeys$MODULE_NAME$;
import org.apache.spark.internal.LogKeys$PYTHON_EXEC$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.internal.config.Python$;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.errors.QueryCompilationErrors$;
import org.apache.spark.sql.errors.QueryExecutionErrors$;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.ArrowUtils$;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.IterableOnceOps;
import scala.collection.Iterator;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: PythonStreamingSourceRunner.scala */
@ScalaSignature(bytes = "\u0006\u0005\t-r!\u0002\u001d:\u0011\u00031e!\u0002%:\u0011\u0003I\u0005\"\u0002)\u0002\t\u0003\t\u0006b\u0002*\u0002\u0005\u0004%\ta\u0015\u0005\u0007/\u0006\u0001\u000b\u0011\u0002+\t\u000fa\u000b!\u0019!C\u0001'\"1\u0011,\u0001Q\u0001\nQCqAW\u0001C\u0002\u0013\u00051\u000b\u0003\u0004\\\u0003\u0001\u0006I\u0001\u0016\u0005\b9\u0006\u0011\r\u0011\"\u0001T\u0011\u0019i\u0016\u0001)A\u0005)\"9a,\u0001b\u0001\n\u0003\u0019\u0006BB0\u0002A\u0003%A\u000bC\u0004a\u0003\t\u0007I\u0011A*\t\r\u0005\f\u0001\u0015!\u0003U\u0011\u001d\u0011\u0017A1A\u0005\u0002MCaaY\u0001!\u0002\u0013!f\u0001\u0002%:\u0001\u0011D\u0001b[\t\u0003\u0002\u0003\u0006I\u0001\u001c\u0005\tgF\u0011\t\u0011)A\u0005i\")\u0001+\u0005C\u0001u\"9a0\u0005b\u0001\n\u0003y\b\u0002CA\t#\u0001\u0006I!!\u0001\t\u0013\u0005M\u0011C1A\u0005\n\u0005U\u0001\u0002CA\u0010#\u0001\u0006I!a\u0006\t\u0011\u0005\u0005\u0012C1A\u0005\nMCq!a\t\u0012A\u0003%A\u000bC\u0005\u0002&E\u0011\r\u0011\"\u0003\u0002(!A\u0011qF\t!\u0002\u0013\tI\u0003C\u0005\u00022E\u0011\r\u0011\"\u0003\u00024!A\u0011QK\t!\u0002\u0013\t)\u0004C\u0005\u0002XE\u0011\r\u0011\"\u0003\u0002Z!A\u00111L\t!\u0002\u0013\t\t\u0005C\u0005\u0002^E\u0001\r\u0011\"\u0003\u0002`!I\u0011QN\tA\u0002\u0013%\u0011q\u000e\u0005\t\u0003w\n\u0002\u0015)\u0003\u0002b!I\u0011QP\tA\u0002\u0013%\u0011q\u0010\u0005\n\u0003\u0013\u000b\u0002\u0019!C\u0005\u0003\u0017C\u0001\"a$\u0012A\u0003&\u0011\u0011\u0011\u0005\n\u0003#\u000b\"\u0019!C\u0005\u00033B\u0001\"a%\u0012A\u0003%\u0011\u0011\t\u0005\n\u0003+\u000b\u0002\u0019!C\u0005\u0003/C\u0011\"!*\u0012\u0001\u0004%I!a*\t\u0011\u0005-\u0016\u0003)Q\u0005\u00033C\u0011\"!,\u0012\u0001\u0004%I!a,\t\u0013\u0005]\u0016\u00031A\u0005\n\u0005e\u0006\u0002CA_#\u0001\u0006K!!-\t\u000f\u0005}\u0016\u0003\"\u0001\u0002B\"9\u00111Y\t\u0005\u0002\u0005\u0015\u0007bBAd#\u0011\u0005\u0011Q\u0019\u0005\b\u0003\u0013\fB\u0011AAf\u0011\u001d\u0011I!\u0005C\u0001\u0005\u0017AqAa\u0004\u0012\t\u0003\t\t\rC\u0005\u0003\u0012E\u0011\r\u0011\"\u0003\u0003\u0014!A!QE\t!\u0002\u0013\u0011)\u0002C\u0004\u0003(E!\tA!\u000b\u00027AKH\u000f[8o'R\u0014X-Y7j]\u001e\u001cv.\u001e:dKJ+hN\\3s\u0015\tQ4(\u0001\u0004qsRDwN\u001c\u0006\u0003yu\n\u0011\"\u001a=fGV$\u0018n\u001c8\u000b\u0005yz\u0014aA:rY*\u0011\u0001)Q\u0001\u0006gB\f'o\u001b\u0006\u0003\u0005\u000e\u000ba!\u00199bG\",'\"\u0001#\u0002\u0007=\u0014xm\u0001\u0001\u0011\u0005\u001d\u000bQ\"A\u001d\u00037AKH\u000f[8o'R\u0014X-Y7j]\u001e\u001cv.\u001e:dKJ+hN\\3s'\t\t!\n\u0005\u0002L\u001d6\tAJC\u0001N\u0003\u0015\u00198-\u00197b\u0013\tyEJ\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\u0019\u000ba#\u0013(J)&\u000bEjX(G\rN+Ek\u0018$V\u001d\u000e{\u0016\nR\u000b\u0002)B\u00111*V\u0005\u0003-2\u00131!\u00138u\u0003]Ie*\u0013+J\u00032{vJ\u0012$T\u000bR{f)\u0016(D?&#\u0005%A\u000bM\u0003R+5\u000bV0P\r\u001a\u001bV\tV0G+:\u001bu,\u0013#\u0002-1\u000bE+R*U?>3eiU#U?\u001a+fjQ0J\t\u0002\n!\u0003U!S)&#\u0016j\u0014(T?\u001a+fjQ0J\t\u0006\u0019\u0002+\u0011*U\u0013RKuJT*`\rVs5iX%EA\u0005q1iT'N\u0013R{f)\u0016(D?&#\u0015aD\"P\u001b6KEk\u0018$V\u001d\u000e{\u0016\n\u0012\u0011\u00029A\u0013VIR#U\u0007\"+Ei\u0018*F\u0007>\u0013FiU0O\u001fR{fiT+O\t\u0006i\u0002KU#G\u000bR\u001b\u0005*\u0012#`%\u0016\u001buJ\u0015#T?:{Ek\u0018$P+:#\u0005%\u0001\u0011O\u001f:{V)\u0014)U3~\u0003\u0016,\u0011*S\u001f^{&+R\"P%\u0012{&)\u0011+D\u0011\u0016\u001b\u0016!\t(P\u001d~+U\n\u0015+Z?BK\u0016I\u0015*P/~\u0013ViQ(S\t~\u0013\u0015\tV\"I\u000bN\u0003\u0013\u0001H#N!RKv\fU-B%J{uk\u0018*F\u0007>\u0013Fi\u0018\"B)\u000eCUiU\u0001\u001e\u000b6\u0003F+W0Q3\u0006\u0013&kT,`%\u0016\u001buJ\u0015#`\u0005\u0006#6\tS#TAM\u0019\u0011CS3\u0011\u0005\u0019LW\"A4\u000b\u0005!|\u0014\u0001C5oi\u0016\u0014h.\u00197\n\u0005)<'a\u0002'pO\u001eLgnZ\u0001\u0005MVt7\r\u0005\u0002nc6\taN\u0003\u0002;_*\u0011\u0001oP\u0001\u0004CBL\u0017B\u0001:o\u00059\u0001\u0016\u0010\u001e5p]\u001a+hn\u0019;j_:\fAb\\;uaV$8k\u00195f[\u0006\u0004\"!\u001e=\u000e\u0003YT!a^\u001f\u0002\u000bQL\b/Z:\n\u0005e4(AC*ueV\u001cG\u000fV=qKR\u00191\u0010`?\u0011\u0005\u001d\u000b\u0002\"B6\u0015\u0001\u0004a\u0007\"B:\u0015\u0001\u0004!\u0018\u0001D<pe.,'/T8ek2,WCAA\u0001!\u0011\t\u0019!!\u0004\u000e\u0005\u0005\u0015!\u0002BA\u0004\u0003\u0013\tA\u0001\\1oO*\u0011\u00111B\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002\u0010\u0005\u0015!AB*ue&tw-A\u0007x_J\\WM]'pIVdW\rI\u0001\u0005G>tg-\u0006\u0002\u0002\u0018A!\u0011\u0011DA\u000e\u001b\u0005y\u0014bAA\u000f\u007f\tI1\u000b]1sW\u000e{gNZ\u0001\u0006G>tg\rI\u0001\u000bEV4g-\u001a:TSj,\u0017a\u00032vM\u001a,'oU5{K\u0002\n\u0011#Y;uQN{7m[3u)&lWm\\;u+\t\tI\u0003E\u0002L\u0003WI1!!\fM\u0005\u0011auN\\4\u0002%\u0005,H\u000f[*pG.,G\u000fV5nK>,H\u000fI\u0001\bK:4h+\u0019:t+\t\t)\u0004\u0005\u0005\u00028\u0005u\u0012\u0011IA!\u001b\t\tID\u0003\u0003\u0002<\u0005%\u0011\u0001B;uS2LA!a\u0010\u0002:\t\u0019Q*\u00199\u0011\t\u0005\r\u0013\u0011\u000b\b\u0005\u0003\u000b\ni\u0005E\u0002\u0002H1k!!!\u0013\u000b\u0007\u0005-S)\u0001\u0004=e>|GOP\u0005\u0004\u0003\u001fb\u0015A\u0002)sK\u0012,g-\u0003\u0003\u0002\u0010\u0005M#bAA(\u0019\u0006AQM\u001c<WCJ\u001c\b%\u0001\u0006qsRDwN\\#yK\u000e,\"!!\u0011\u0002\u0017ALH\u000f[8o\u000bb,7\rI\u0001\raf$\bn\u001c8X_J\\WM]\u000b\u0003\u0003C\u0002RaSA2\u0003OJ1!!\u001aM\u0005\u0019y\u0005\u000f^5p]B\u0019Q.!\u001b\n\u0007\u0005-dN\u0001\u0007QsRDwN\\,pe.,'/\u0001\tqsRDwN\\,pe.,'o\u0018\u0013fcR!\u0011\u0011OA<!\rY\u00151O\u0005\u0004\u0003kb%\u0001B+oSRD\u0011\"!\u001f#\u0003\u0003\u0005\r!!\u0019\u0002\u0007a$\u0013'A\u0007qsRDwN\\,pe.,'\u000fI\u0001\u0014af$\bn\u001c8X_J\\WM\u001d$bGR|'/_\u000b\u0003\u0003\u0003\u0003RaSA2\u0003\u0007\u00032!\\AC\u0013\r\t9I\u001c\u0002\u0014!f$\bn\u001c8X_J\\WM\u001d$bGR|'/_\u0001\u0018af$\bn\u001c8X_J\\WM\u001d$bGR|'/_0%KF$B!!\u001d\u0002\u000e\"I\u0011\u0011P\u0013\u0002\u0002\u0003\u0007\u0011\u0011Q\u0001\u0015af$\bn\u001c8X_J\\WM\u001d$bGR|'/\u001f\u0011\u0002\u0013ALH\u000f[8o-\u0016\u0014\u0018A\u00039zi\"|gNV3sA\u00059A-\u0019;b\u001fV$XCAAM!\u0011\tY*!)\u000e\u0005\u0005u%\u0002BAP\u0003\u0013\t!![8\n\t\u0005\r\u0016Q\u0014\u0002\u0011\t\u0006$\u0018mT;uaV$8\u000b\u001e:fC6\f1\u0002Z1uC>+Ho\u0018\u0013fcR!\u0011\u0011OAU\u0011%\tIHKA\u0001\u0002\u0004\tI*\u0001\u0005eCR\fw*\u001e;!\u0003\u0019!\u0017\r^1J]V\u0011\u0011\u0011\u0017\t\u0005\u00037\u000b\u0019,\u0003\u0003\u00026\u0006u%a\u0004#bi\u0006Le\u000e];u'R\u0014X-Y7\u0002\u0015\u0011\fG/Y%o?\u0012*\u0017\u000f\u0006\u0003\u0002r\u0005m\u0006\"CA=[\u0005\u0005\t\u0019AAY\u0003\u001d!\u0017\r^1J]\u0002\nA!\u001b8jiR\u0011\u0011\u0011O\u0001\rY\u0006$Xm\u001d;PM\u001a\u001cX\r\u001e\u000b\u0003\u0003\u0003\nQ\"\u001b8ji&\fGn\u00144gg\u0016$\u0018A\u00039beRLG/[8ogR1\u0011Q\u001aB\u0001\u0005\u000b\u0001raSAh\u0003'\f\t/C\u0002\u0002R2\u0013a\u0001V;qY\u0016\u0014\u0004#B&\u0002V\u0006e\u0017bAAl\u0019\n)\u0011I\u001d:bsB)1*!6\u0002\\B\u00191*!8\n\u0007\u0005}GJ\u0001\u0003CsR,\u0007#B&\u0002d\u0005\r\bCBAs\u0003_\f)P\u0004\u0003\u0002h\u0006-h\u0002BA$\u0003SL\u0011!T\u0005\u0004\u0003[d\u0015a\u00029bG.\fw-Z\u0005\u0005\u0003c\f\u0019P\u0001\u0005Ji\u0016\u0014\u0018\r^8s\u0015\r\ti\u000f\u0014\t\u0005\u0003o\fi0\u0004\u0002\u0002z*\u0019\u00111`\u001f\u0002\u0011\r\fG/\u00197zgRLA!a@\u0002z\nY\u0011J\u001c;fe:\fGNU8x\u0011\u001d\u0011\u0019A\ra\u0001\u0003\u0003\nQa\u001d;beRDqAa\u00023\u0001\u0004\t\t%A\u0002f]\u0012\faaY8n[&$H\u0003BA9\u0005\u001bAqAa\u00024\u0001\u0004\t\t%\u0001\u0003ti>\u0004\u0018!C1mY>\u001c\u0017\r^8s+\t\u0011)\u0002\u0005\u0003\u0003\u0018\t\u0005RB\u0001B\r\u0015\u0011\u0011YB!\b\u0002\r5,Wn\u001c:z\u0015\r\u0011y\"Q\u0001\u0006CJ\u0014xn^\u0005\u0005\u0005G\u0011IBA\bCk\u001a4WM]!mY>\u001c\u0017\r^8s\u0003)\tG\u000e\\8dCR|'\u000fI\u0001\u0017e\u0016\fG-\u0011:s_^\u0014VmY8sI\n\u000bGo\u00195fgR\u0011\u00111\u001d")
/* loaded from: input_file:org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.class */
public class PythonStreamingSourceRunner implements Logging {
    private final PythonFunction func;
    private final StructType outputSchema;
    private final String workerModule;
    private final SparkConf conf;
    private final int bufferSize;
    private final long authSocketTimeout;
    private final Map<String, String> envVars;
    private final String pythonExec;
    private Option<PythonWorker> pythonWorker;
    private Option<PythonWorkerFactory> pythonWorkerFactory;
    private final String pythonVer;
    private DataOutputStream dataOut;
    private DataInputStream dataIn;
    private final BufferAllocator allocator;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static int EMPTY_PYARROW_RECORD_BATCHES() {
        return PythonStreamingSourceRunner$.MODULE$.EMPTY_PYARROW_RECORD_BATCHES();
    }

    public static int NON_EMPTY_PYARROW_RECORD_BATCHES() {
        return PythonStreamingSourceRunner$.MODULE$.NON_EMPTY_PYARROW_RECORD_BATCHES();
    }

    public static int PREFETCHED_RECORDS_NOT_FOUND() {
        return PythonStreamingSourceRunner$.MODULE$.PREFETCHED_RECORDS_NOT_FOUND();
    }

    public static int COMMIT_FUNC_ID() {
        return PythonStreamingSourceRunner$.MODULE$.COMMIT_FUNC_ID();
    }

    public static int PARTITIONS_FUNC_ID() {
        return PythonStreamingSourceRunner$.MODULE$.PARTITIONS_FUNC_ID();
    }

    public static int LATEST_OFFSET_FUNC_ID() {
        return PythonStreamingSourceRunner$.MODULE$.LATEST_OFFSET_FUNC_ID();
    }

    public static int INITIAL_OFFSET_FUNC_ID() {
        return PythonStreamingSourceRunner$.MODULE$.INITIAL_OFFSET_FUNC_ID();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public Logging.LogStringContext LogStringContext(StringContext stringContext) {
        return Logging.LogStringContext$(this, stringContext);
    }

    public void withLogContext(HashMap<String, String> hashMap, Function0<BoxedUnit> function0) {
        Logging.withLogContext$(this, hashMap, function0);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logInfo(LogEntry logEntry) {
        Logging.logInfo$(this, logEntry);
    }

    public void logInfo(LogEntry logEntry, Throwable th) {
        Logging.logInfo$(this, logEntry, th);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logDebug(LogEntry logEntry) {
        Logging.logDebug$(this, logEntry);
    }

    public void logDebug(LogEntry logEntry, Throwable th) {
        Logging.logDebug$(this, logEntry, th);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logTrace(LogEntry logEntry) {
        Logging.logTrace$(this, logEntry);
    }

    public void logTrace(LogEntry logEntry, Throwable th) {
        Logging.logTrace$(this, logEntry, th);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logWarning(LogEntry logEntry) {
        Logging.logWarning$(this, logEntry);
    }

    public void logWarning(LogEntry logEntry, Throwable th) {
        Logging.logWarning$(this, logEntry, th);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logError(LogEntry logEntry) {
        Logging.logError$(this, logEntry);
    }

    public void logError(LogEntry logEntry, Throwable th) {
        Logging.logError$(this, logEntry, th);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String workerModule() {
        return this.workerModule;
    }

    private SparkConf conf() {
        return this.conf;
    }

    private int bufferSize() {
        return this.bufferSize;
    }

    private long authSocketTimeout() {
        return this.authSocketTimeout;
    }

    private Map<String, String> envVars() {
        return this.envVars;
    }

    private String pythonExec() {
        return this.pythonExec;
    }

    private Option<PythonWorker> pythonWorker() {
        return this.pythonWorker;
    }

    private void pythonWorker_$eq(Option<PythonWorker> option) {
        this.pythonWorker = option;
    }

    private Option<PythonWorkerFactory> pythonWorkerFactory() {
        return this.pythonWorkerFactory;
    }

    private void pythonWorkerFactory_$eq(Option<PythonWorkerFactory> option) {
        this.pythonWorkerFactory = option;
    }

    private String pythonVer() {
        return this.pythonVer;
    }

    private DataOutputStream dataOut() {
        return this.dataOut;
    }

    private void dataOut_$eq(DataOutputStream dataOutputStream) {
        this.dataOut = dataOutputStream;
    }

    private DataInputStream dataIn() {
        return this.dataIn;
    }

    private void dataIn_$eq(DataInputStream dataInputStream) {
        this.dataIn = dataInputStream;
    }

    public void init() {
        PythonWorker pythonWorker;
        logInfo(LogEntry$.MODULE$.from(() -> {
            return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Initializing Python runner pythonExec: ", ""}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$PYTHON_EXEC$.MODULE$, this.pythonExec())}));
        }));
        envVars().put("SPARK_LOCAL_DIRS", Predef$.MODULE$.wrapRefArray((Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(SparkEnv$.MODULE$.get().blockManager().diskBlockManager().localDirs()), file -> {
            return file.getPath();
        }, ClassTag$.MODULE$.apply(String.class))).mkString(","));
        envVars().put("SPARK_AUTH_SOCKET_TIMEOUT", Long.toString(authSocketTimeout()));
        envVars().put("SPARK_BUFFER_SIZE", Integer.toString(bufferSize()));
        PythonWorkerFactory pythonWorkerFactory = new PythonWorkerFactory(pythonExec(), workerModule(), CollectionConverters$.MODULE$.MapHasAsScala(envVars()).asScala().toMap($less$colon$less$.MODULE$.refl()), false);
        Tuple2 createSimpleWorker = pythonWorkerFactory.createSimpleWorker(true);
        if (createSimpleWorker == null || (pythonWorker = (PythonWorker) createSimpleWorker._1()) == null) {
            throw new MatchError(createSimpleWorker);
        }
        pythonWorker_$eq(new Some(pythonWorker));
        pythonWorkerFactory_$eq(new Some(pythonWorkerFactory));
        dataOut_$eq(new DataOutputStream(new BufferedOutputStream(((PythonWorker) pythonWorker().get()).channel().socket().getOutputStream(), bufferSize())));
        PythonWorkerUtils$.MODULE$.writePythonVersion(pythonVer(), dataOut());
        PythonWorkerUtils$.MODULE$.writeSparkFiles(new Some("streaming_job"), CollectionConverters$.MODULE$.ListHasAsScala(this.func.pythonIncludes()).asScala().toSet(), dataOut());
        PythonWorkerUtils$.MODULE$.writePythonFunction(this.func, dataOut());
        PythonWorkerUtils$.MODULE$.writeUTF(this.outputSchema.json(), dataOut());
        dataOut().writeInt(SQLConf$.MODULE$.get().arrowMaxRecordsPerBatch());
        dataOut().flush();
        dataIn_$eq(new DataInputStream(new BufferedInputStream(((PythonWorker) pythonWorker().get()).channel().socket().getInputStream(), bufferSize())));
        if (dataIn().readInt() == SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            throw QueryCompilationErrors$.MODULE$.pythonDataSourceError("plan", "initialize source", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
        }
    }

    public String latestOffset() {
        dataOut().writeInt(PythonStreamingSourceRunner$.MODULE$.LATEST_OFFSET_FUNC_ID());
        dataOut().flush();
        int readInt = dataIn().readInt();
        if (readInt != SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            return PythonWorkerUtils$.MODULE$.readUTF(readInt, dataIn());
        }
        throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("latestOffset", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
    }

    public String initialOffset() {
        dataOut().writeInt(PythonStreamingSourceRunner$.MODULE$.INITIAL_OFFSET_FUNC_ID());
        dataOut().flush();
        int readInt = dataIn().readInt();
        if (readInt != SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            return PythonWorkerUtils$.MODULE$.readUTF(readInt, dataIn());
        }
        throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("initialOffset", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
    }

    public Tuple2<byte[][], Option<Iterator<InternalRow>>> partitions(String str, String str2) {
        Some some;
        dataOut().writeInt(PythonStreamingSourceRunner$.MODULE$.PARTITIONS_FUNC_ID());
        PythonWorkerUtils$.MODULE$.writeUTF(str, dataOut());
        PythonWorkerUtils$.MODULE$.writeUTF(str2, dataOut());
        dataOut().flush();
        ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
        int readInt = dataIn().readInt();
        if (readInt == SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("planPartitions", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
        }
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), readInt).foreach(obj -> {
            return $anonfun$partitions$1(this, empty, BoxesRunTime.unboxToInt(obj));
        });
        int readInt2 = dataIn().readInt();
        if (PythonStreamingSourceRunner$.MODULE$.NON_EMPTY_PYARROW_RECORD_BATCHES() == readInt2) {
            some = new Some(readArrowRecordBatches());
        } else if (PythonStreamingSourceRunner$.MODULE$.PREFETCHED_RECORDS_NOT_FOUND() == readInt2) {
            some = None$.MODULE$;
        } else {
            if (PythonStreamingSourceRunner$.MODULE$.EMPTY_PYARROW_RECORD_BATCHES() != readInt2) {
                if (SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN() != readInt2) {
                    throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("planPartitions", "unknown status code " + readInt2);
                }
                throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("planPartitions", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
            }
            some = new Some(package$.MODULE$.Iterator().empty());
        }
        return new Tuple2<>(empty.toArray(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE))), some);
    }

    public void commit(String str) {
        dataOut().writeInt(PythonStreamingSourceRunner$.MODULE$.COMMIT_FUNC_ID());
        PythonWorkerUtils$.MODULE$.writeUTF(str, dataOut());
        dataOut().flush();
        if (dataIn().readInt() == SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("commitSource", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
        }
    }

    public void stop() {
        logInfo(LogEntry$.MODULE$.from(() -> {
            return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Stopping streaming runner for module: "}))).log(Nil$.MODULE$).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"", "."}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$MODULE_NAME$.MODULE$, this.workerModule())})));
        }));
        try {
            pythonWorkerFactory().foreach(pythonWorkerFactory -> {
                $anonfun$stop$2(this, pythonWorkerFactory);
                return BoxedUnit.UNIT;
            });
        } catch (Exception e) {
            logError(() -> {
                return "Exception when trying to kill worker";
            }, e);
        }
    }

    private BufferAllocator allocator() {
        return this.allocator;
    }

    public Iterator<InternalRow> readArrowRecordBatches() {
        int readInt = dataIn().readInt();
        if (SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN() == readInt) {
            throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("prefetchArrowBatches", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
        }
        if (SpecialLengths$.MODULE$.START_ARROW_STREAM() != readInt) {
            throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("prefetchArrowBatches", "unknown status code " + readInt);
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        ArrowStreamReader arrowStreamReader = new ArrowStreamReader(dataIn(), allocator());
        VectorSchemaRoot vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot();
        StructType fromArrowSchema = ArrowUtils$.MODULE$.fromArrowSchema(vectorSchemaRoot.getSchema());
        Predef$ predef$ = Predef$.MODULE$;
        StructType structType = this.outputSchema;
        predef$.assert(fromArrowSchema != null ? fromArrowSchema.equals(structType) : structType == null);
        ColumnVector[] columnVectorArr = (ColumnVector[]) ((IterableOnceOps) CollectionConverters$.MODULE$.ListHasAsScala(vectorSchemaRoot.getFieldVectors()).asScala().map(fieldVector -> {
            return new ArrowColumnVector(fieldVector);
        })).toArray(ClassTag$.MODULE$.apply(ColumnVector.class));
        ArrayBuffer arrayBuffer = (ArrayBuffer) ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        while (arrowStreamReader.loadNextBatch()) {
            ColumnarBatch columnarBatch = new ColumnarBatch(columnVectorArr);
            columnarBatch.setNumRows(vectorSchemaRoot.getRowCount());
            arrayBuffer.appendAll(CollectionConverters$.MODULE$.IteratorHasAsScala(columnarBatch.rowIterator()).asScala().map(internalRow -> {
                return internalRow.copy();
            }));
        }
        arrowStreamReader.close(false);
        return arrayBuffer.iterator();
    }

    public static final /* synthetic */ ArrayBuffer $anonfun$partitions$1(PythonStreamingSourceRunner pythonStreamingSourceRunner, ArrayBuffer arrayBuffer, int i) {
        return arrayBuffer.append(PythonWorkerUtils$.MODULE$.readBytes(pythonStreamingSourceRunner.dataIn()));
    }

    public static final /* synthetic */ void $anonfun$stop$3(PythonWorkerFactory pythonWorkerFactory, PythonWorker pythonWorker) {
        pythonWorkerFactory.stopWorker(pythonWorker);
        pythonWorkerFactory.stop();
    }

    public static final /* synthetic */ void $anonfun$stop$2(PythonStreamingSourceRunner pythonStreamingSourceRunner, PythonWorkerFactory pythonWorkerFactory) {
        pythonStreamingSourceRunner.pythonWorker().foreach(pythonWorker -> {
            $anonfun$stop$3(pythonWorkerFactory, pythonWorker);
            return BoxedUnit.UNIT;
        });
    }

    public PythonStreamingSourceRunner(PythonFunction pythonFunction, StructType structType) {
        this.func = pythonFunction;
        this.outputSchema = structType;
        Logging.$init$(this);
        this.workerModule = "pyspark.sql.streaming.python_streaming_source_runner";
        this.conf = SparkEnv$.MODULE$.get().conf();
        this.bufferSize = BoxesRunTime.unboxToInt(conf().get(org.apache.spark.internal.config.package$.MODULE$.BUFFER_SIZE()));
        this.authSocketTimeout = BoxesRunTime.unboxToLong(conf().get(Python$.MODULE$.PYTHON_AUTH_SOCKET_TIMEOUT()));
        this.envVars = pythonFunction.envVars();
        this.pythonExec = pythonFunction.pythonExec();
        this.pythonWorker = None$.MODULE$;
        this.pythonWorkerFactory = None$.MODULE$;
        this.pythonVer = pythonFunction.pythonVer();
        this.dataOut = null;
        this.dataIn = null;
        this.allocator = ArrowUtils$.MODULE$.rootAllocator().newChildAllocator("stream reader for " + pythonExec(), 0L, Long.MAX_VALUE);
    }
}
