8000 Revert "Fix Horovod pyarrow IndexError: list index out of range (#3255)" by tgaddair · Pull Request #3265 · horovod/horovod · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Revert "Fix Horovod pyarrow IndexError: list index out of range (#3255)" #3265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions horovod/spark/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import contextlib
import os
import time

from multiprocessing.pool import ThreadPool
import pyarrow as pa
import numpy as np
import pyspark.sql.functions as f
Expand Down Expand Up @@ -541,41 +539,6 @@ def _train_val_split(df, validation):
return train_df, val_df, validation_ratio


_FILE_AVAILABILITY_WAIT_TIMEOUT_SECS = \
int(os.environ.get('FILE_AVAILABILITY_WAIT_TIMEOUT_SECS', '30'))


def _wait_file_available(store, url_list):
"""Waiting about _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS seconds (default 30 seconds) to make sure
all files are available for reading. This is useful in some filesystems, such as S3 which only
providing eventually consistency.
"""
# Import LocalStore here to avoid circular import
from horovod.spark.common.store import LocalStore
if isinstance(store, LocalStore):
return

def wait_for_file(path):
end_time = time.time() + _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS
while time.time() < end_time:
if store.exists(path):
return True
time.sleep(0.1)
return False

pool = ThreadPool(min(len(url_list), 64))
try:
results = pool.map(wait_for_file, url_list)
failed_list = [url for url, result in zip(url_list, results) if not result]
if failed_list:
raise RuntimeError('Timeout while waiting for all parquet-store files to appear at urls {failed_list},'
'Please check whether these files were saved successfully when materializing dataframe.'
.format(failed_list=','.join(failed_list)))
finally:
pool.close()
pool.join()


def _get_or_create_dataset(key, store, df, feature_columns, label_columns,
validation, sample_weight_col, compress_sparse,
num_partitions, num_processes, verbose):
Expand Down Expand Up @@ -639,12 +602,6 @@ def _get_or_create_dataset(key, store, df, feature_columns, label_columns,
.mode('overwrite') \
.parquet(val_data_path)

saved_file_list = list(train_df._jdf.inputFiles())
if val_df:
saved_file_list += list(val_df._jdf.inputFiles())

_wait_file_available(store, saved_file_list)

train_rows, val_rows, pq_metadata, avg_row_size = get_simple_meta_from_parquet(
store, label_columns, feature_columns, sample_weight_col, dataset_idx)

Expand Down
42 changes: 0 additions & 42 deletions test/utils/spark_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
import contextlib
import os
import platform
import pytest
import stat
import sys
import threading
import time

from tempfile import TemporaryDirectory

Expand All @@ -33,7 +30,6 @@

from horovod.runner.common.util import secret
from horovod.spark.common.store import LocalStore
from horovod.spark.common.util import _wait_file_available
from horovod.spark.driver.driver_service import SparkDriverService, SparkDriverClient
from horovod.spark.task.task_service import SparkTaskService, SparkTaskClient

Expand Down Expand Up @@ -236,41 +232,3 @@ def create_mnist_data(spark):

def create_test_data_from_schema(spark, data, schema):
return spark.createDataFrame(data, schema=schema)


def test_wait_file_available():
with tempdir() as d:
pq_dir = os.path.join(d, 'test_ev')
os.makedirs(pq_dir)
file1_path = os.path.join(pq_dir, 'file1')
file2_path = os.path.join(pq_dir, 'file2')
url1 = 'file://' + file1_path.replace(os.sep, '/')
url2 = 'file://' + file2_path.replace(os.sep, '/')

url_list = [url1, url2]

def create_file(p):
with open(p, 'w'):
pass

# 1. test all files exists.
create_file(file1_path)
create_file(file2_path)
_wait_file_available(url_list)

# 2. test one file does not exists. Raise error.
os.remove(file2_path)
with pytest.raises(
RuntimeError,
match='Timeout while waiting for all parquet-store files to appear at urls'
):
_wait_file_available(url_list)

# 3. test one file accessible after 1 second.
def delay_create_file2():
time.sleep(1)
create_file(file2_path)

threading.Thread(target=delay_create_file2()).start()

_wait_file_available(url_list)
0