u/Every_Lake7203

User Data Functions Fail Only with Pyspark

I am seeing that whenever I use pyspark to access user data functions that I get the following error.

I do not get this error when calling with a client outside of the spark environment. This is for relatively small amounts of data. maybe a few kB. And the udfs return very quickly. Only a few seconds in any other context.

Seems like there is some spark issue happening.

Has anyone else seen this and gotten help?

I submitted a support ticket and they just told me to make very small batch requests. But that completely defeats the use case for this set of user data functions. And the number of batch requests would be enormous because the error is caused at such a small data threshold.

For some additional context, this udf returns a pd.Dataframe type.

---------------------------------------------------------------------------

Py4JJavaError Traceback (most recent call last)

Cell In[11], line 2

1 caiso_node_list = [valid_list_of_nodes]

----> 2 df_caiso_lmp = myFunctions.retrieve_da_lmp(

3 isoName = "CAISO",

4 nodeIds = caiso_node_list,

5 settlementStart = "2026-01-01",

6 settlementEnd = "2026-01-10"

7 )

8 df_caiso_lmp.head()

File ~/cluster-env/trident_env/lib/python3.11/site-packages/notebookutils/mssparkutils/handlers/udfHandler.py:171, in UDF.__create_dynamic_function.<locals>.dynamic_function(*args, **kwargs)

169 workspace_id = self.__metadata.get("folderObjectId", "")

170 capacity_id = self.__metadata.get("capacityObjectId", "")

--> 171 result = self.__udf_handler.run(artifact_id, name, parameters, workspace_id, capacity_id)

172 return self.__udf_handler.formatResult(result, name, return_type)

File ~/cluster-env/trident_env/lib/python3.11/site-packages/notebookutils/mssparkutils/handlers/udfHandler.py:34, in UdfHandler.run(self, artifact_id, function_name, parameters, workspace_id, capacity_id)

31 workspace_id = self.getCurrentWorkspaceId()

33 self.formatParameters(parameters)

---> 34 result = self.jvm.notebookutils.udf.run(artifact_id, function_name, parameters, workspace_id, capacity_id)

35 if isinstance(result, str):

36 return result

File ~/cluster-env/trident_env/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)

1316 command = proto.CALL_COMMAND_NAME +\

1317 self.command_header +\

1318 args_command +\

1319 proto.END_COMMAND_PART

1321 answer = self.gateway_client.send_command(command)

-> 1322 return_value = get_return_value(

1323 answer, self.gateway_client, self.target_id, self.name)

1325 for temp_arg in temp_args:

1326 if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)

177 def deco(*a: Any, **kw: Any) -> Any:

178 try:

--> 179 return f(*a, **kw)

180 except Py4JJavaError as e:

181 converted = convert_exception(e.java_exception)

File ~/cluster-env/trident_env/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)

324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)

325 if answer[1] == REFERENCE_TYPE:

--> 326 raise Py4JJavaError(

327 "An error occurred while calling {0}{1}{2}.\n".

328 format(target_id, ".", name), value)

329 else:

330 raise Py4JError(

331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".

332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling z:notebookutils.udf.run.

: java.io.IOException: Unexpected end of stream trying to read message.

at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:701)

at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:57)

at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:202)

at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:185)

at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:176)

at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:73)

at org.apache.spark.sql.notebook.ArrowHelper$.arrowBytesToDataFrame(ArrowHelper.scala:299)

at com.microsoft.spark.notebook.msutils.impl.fabric.UdfUtilsImpl$.run(UdfUtilsImpl.scala:233)

at notebookutils.udf$.$anonfun$run$1(udf.scala:47)

at com.microsoft.spark.notebook.common.trident.CertifiedTelemetryUtils$.withTelemetry(CertifiedTelemetryUtils.scala:98)

at notebookutils.udf$.run(udf.scala:46)

at notebookutils.udf.run(udf.scala)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)

at py4j.Gateway.invoke(Gateway.java:282)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.base/java.lang.Thread.run(Thread.java:829)
reddit.com
u/Every_Lake7203 — 8 days ago