Skip to content

Commit 764912c

Browse files
authored
Pipe: Fixed the issue that the DropPipe Connector subtask process waits too long for the callback thread to end (apache#15605)
1 parent 70ae1ac commit 764912c

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
234234

235235
highPriorityLockTaskCount.incrementAndGet();
236236
try {
237+
synchronized (highPriorityLockTaskCount) {
238+
highPriorityLockTaskCount.notifyAll();
239+
}
240+
237241
// synchronized to use the lastEvent & lastExceptionEvent
238242
synchronized (this) {
239243
// Here we discard the last event, and re-submit the pipe task to avoid that the pipe task

‎iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java‎

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,14 @@ private boolean onPipeConnectionException(final Throwable throwable) {
175175
MAX_RETRY_TIMES,
176176
e);
177177
try {
178-
Thread.sleep(retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
178+
synchronized (highPriorityLockTaskCount) {
179+
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
180+
// no deadlock.
181+
if (highPriorityLockTaskCount.get() == 0) {
182+
highPriorityLockTaskCount.wait(
183+
retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
184+
}
185+
}
179186
} catch (final InterruptedException interruptedException) {
180187
LOGGER.info(
181188
"Interrupted while sleeping, will retry to handshake with the target system.",

0 commit comments

Comments
 (0)