8000 vine: more detailed cache-invalid messages by JinZhou5042 · Pull Request #4147 · cooperative-computing-lab/cctools · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

vine: more detailed cache-invalid messages #4147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
23 changes: 15 additions & 8 deletions taskvine/src/worker/vine_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,19 +532,20 @@ static int do_worker_transfer(struct vine_cache *c, struct vine_cache_file *f, c
{
int port_num;
char addr[VINE_LINE_MAX], source_path[VINE_LINE_MAX];
int stoptime;
struct link *worker_link;

// expect the form: workerip://host:port/path/to/file
sscanf(f->source, "workerip://%256[^:]:%d/%s", addr, &port_num, source_path);

debug(D_VINE, "cache: setting up worker transfer file %s", f->source);

stoptime = time(0) + 15;
worker_link = link_connect(addr, port_num, stoptime);
timestamp_t time_start_connect = timestamp_get();
worker_link = link_connect(addr, port_num, time(0) + 300);
timestamp_t time_end_connect = timestamp_get();
int time_duration_connect = (time_end_connect - time_start_connect) / 1e6;

if (worker_link == NULL) {
*error_message = string_format("Could not establish connection with worker at: %s:%d", addr, port_num);
*error_message = string_format("Could not establish connection with worker at: %s:%d after %d seconds: %s", addr, port_num, time_duration_connect, strerror(errno));
return 0;
}

Expand All @@ -562,8 +563,10 @@ static int do_worker_transfer(struct vine_cache *c, struct vine_cache_file *f, c
int64_t totalsize;
int mode, mtime;

if (!vine_transfer_request_any(worker_link, source_path, transfer_dir, &totalsize, &mode, &mtime, time(0) + 900)) {
*error_message = string_format("Could not transfer file from %s", f->source);
if (!vine_transfer_request_any(worker_link, source_path, transfer_dir, &totalsize, &mode, &mtime, time(0) + 900, error_message)) {
if (error_message && *error_message == NULL) {
*error_message = string_format("Could not transfer file from %s", f->source);
}
link_close(worker_link);
return 0;
}
Expand Down Expand Up @@ -643,7 +646,11 @@ static void vine_cache_worker_process(struct vine_cache_file *f, struct vine_cac
char *error_path = vine_cache_error_path(c, cachename);
FILE *file = fopen(error_path, "w");
if (file) {
fprintf(file, "error creating file at worker: %s\n", error_message);
if (f->cache_type == VINE_CACHE_MINI_TASK) {
fprintf(file, "error creating file via mini task: %s\n", error_message);
} else {
fprintf(file, "error transferring file: %s\n", error_message);
}
fclose(file);
}
free(error_path);
Expand Down Expand Up @@ -839,7 +846,7 @@ static void vine_cache_check_outputs(struct vine_cache *c, struct vine_cache_fil
if (copy_file_to_buffer(error_path, &error_message, &error_length) > 0 && error_message) {
/* got a message string */
} else {
error_message = strdup("unknown error");
error_message = string_format("Failed to copy error message to buffer: %s", strerror(errno));
}

vine_worker_send_cache_invalid(manager, cachename, error_message);
Expand Down
51 changes: 42 additions & 9 deletions taskvine/src/worker/vine_transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ static int vine_transfer_get_file_internal(struct link *lnk, const char *filenam
return 1;
}

static int vine_transfer_get_dir_internal(struct link *lnk, const char *dirname, int64_t *totalsize, int mode, int mtime, time_t stoptime);
static int vine_transfer_get_dir_internal(struct link *lnk, const char *dirname, int64_t *totalsize, int mode, int mtime, time_t stoptime, char **error_message);

/*
Receive a single item of unknown type into the directory "dirname".
Expand All @@ -246,16 +246,21 @@ Returns 1 on successful transfer of one item.
Returns 2 on successful receipt of "end" of list.
*/

int vine_transfer_get_any(struct link *lnk, const char *dirname, int64_t *totalsize, int *mode, int *mtime, time_t stoptime)
int vine_transfer_get_any(struct link *lnk, const char *dirname, int64_t *totalsize, int *mode, int *mtime, time_t stoptime, char **error_message)
{
char line[VINE_LINE_MAX];
char name_encoded[VINE_LINE_MAX];
char name[VINE_LINE_MAX];
int64_t size;
int errornum;

if (!recv_message(lnk, line, sizeof(line), stoptime))
if (!recv_message(lnk, line, sizeof(line), stoptime)) {
/* network error before getting any message type */
if (error_message && *error_message == NULL) {
*error_message = string_format("Failed to receive message from worker: %s", strerror(errno));
}
return 0;
}

int r = 0;

Expand All @@ -268,6 +273,9 @@ int vine_transfer_get_any(struct link *lnk, const char *dirname, int64_t *totals

char *subname = string_format("%s/%s", dirname, name);
r = vine_transfer_get_file_internal(lnk, subname, size, *mode, *mtime, stoptime);
if (!r && error_message && *error_message == NULL) {
*error_message = string_format("Failed processing file '%s': %s", name, strerror(errno));
}
free(subname);

*totalsize += size;
Expand All @@ -278,6 +286,9 @@ int vine_transfer_get_any(struct link *lnk, const char *dirname, int64_t *totals

char *subname = string_format("%s/%s", dirname, name);
r = vine_transfer_get_symlink_internal(lnk, subname, size, stoptime);
if (!r && error_message && *error_message == NULL) {
*error_message = string_format("Failed processing symlink '%s': %s", name, strerror(errno));
}
free(subname);

// The symlink doesn't really have an inherent mtime or mode
Expand All @@ -293,16 +304,35 @@ int vine_transfer_get_any(struct link *lnk, const char *dirname, int64_t *totals
*mode &= 0777;

char *subname = string_format("%s/%s", dirname, name);
r = vine_transfer_get_dir_internal(lnk, subname, totalsize, *mode, *mtime, stoptime);
r = vine_transfer_get_dir_internal(lnk, subname, totalsize, *mode, *mtime, stoptime, error_message);
if (!r && error_message && *error_message == NULL) {
*error_message = string_format("Failed processing directory '%s': %s", name, strerror(errno));
}
free(subname);

} else if (sscanf(line, "error %s %d", name_encoded, &errornum) == 2) {

debug(D_VINE, "unable to transfer %s: %s", name_encoded, strerror(errornum));
url_decode(name_encoded, name, sizeof(name));

const char *err_str = strerror(errornum);
debug(D_VINE, "Received error from peer for '%s': %s", name, err_str);

if (error_message && *error_message == NULL) {
*error_message = string_format("Remote worker reported error for '%s': %s", name, err_str);
}
r = 0;

} else if (!strcmp(line, "end")) {
r = 2;
} else {
char line_preview[101];
snprintf(line_preview, sizeof(line_preview), "%.100s", line);

debug(D_VINE, "Received invalid line from peer: %s", line_preview);
if (error_message && *error_message == NULL) {
*error_message = string_format("Received invalid line from peer: %s", line_preview);
}
r = 0;
}

return r;
Expand All @@ -315,22 +345,25 @@ and now we process "file" and "dir" commands within the list
until "end" is reached.
*/

static int vine_transfer_get_dir_internal(struct link *lnk, const char *dirname, int64_t *totalsize, int mode, int mtime, time_t stoptime)
static int vine_transfer_get_dir_internal(struct link *lnk, const char *dirname, int64_t *totalsize, int mode, int mtime, time_t stoptime, char **error_message)
{
/* Only use the normal mode bits. */
mode &= 0777;

int result = mkdir(dirname, mode);
if (result < 0) {
debug(D_VINE, "unable to create %s: %s", dirname, strerror(errno));
if (error_message && *error_message == NULL) {
*error_message = string_format("Unable to create directory '%s': %s", dirname, strerror(errno));
}
return 0;
}

while (1) {
/* dummy values that won't get used. */
int temp_mode, temp_mtime;

int r = vine_transfer_get_any(lnk, dirname, totalsize, &temp_mode, &temp_mtime, stoptime);
int r = vine_transfer_get_any(lnk, dirname, totalsize, &temp_mode, &temp_mtime, stoptime, error_message);
if (r == 1) {
// Successfully received one item.
continue;
Expand All @@ -346,8 +379,8 @@ static int vine_transfer_get_dir_internal(struct link *lnk, const char *dirname,
return 0;
}

int vine_transfer_request_any(struct link *lnk, const char *request_path, const char *dirname, int64_t *totalsize, int *mode, int *mtime, time_t stoptime)
int vine_transfer_request_any(struct link *lnk, const char *request_path, const char *dirname, int64_t *totalsize, int *mode, int *mtime, time_t stoptime, char **error_message)
{
send_message(lnk, "get %s\n", request_path);
return vine_transfer_get_any(lnk, dirname, totalsize, mode, mtime, stoptime);
return vine_transfer_get_any(lnk, dirname, totalsize, mode, mtime, stoptime, error_message);
}
4 changes: 2 additions & 2 deletions taskvine/src/worker/vine_transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ int vine_transfer_put_any( struct link *lnk, struct vine_cache *cache, const cha

/* Receive a named file/dir from the connection to a local transfer path. */

int vine_transfer_get_any(struct link *lnk, const char *dirname, int64_t *totalsize, int *mode, int *mtime, time_t stoptime);
int vine_transfer_get_any(struct link *lnk, const char *dirname, int64_t *totalsize, int *mode, int *mtime, time_t stoptime, char **error_message);

/* Request an item by name, and then receive it in the same way as vine_transfer_get_any. */

int vine_transfer_request_any(struct link *lnk, const char *request_name, const char *dirname, int64_t *totalsize, int *mode, int *mtime, time_t stoptime);
int vine_transfer_request_any(struct link *lnk, const char *request_name, const char *dirname, int64_t *totalsize, int *mode, int *mtime, time_t stoptime, char **error_message);

#endif
3 changes: 2 additions & 1 deletion taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,8 @@ static int do_put(struct link *manager, c 590A onst char *cachename, vine_cache_level_
char *transfer_path = vine_cache_transfer_path(cache_manager, cachename);

timestamp_t start = timestamp_get();
int r = vine_transfer_get_any(manager, transfer_dir, &actual_size, &mode, &mtime, time(0) + options->active_timeout);
char *error_message = NULL;
int r = vine_transfer_get_any(manager, transfer_dir, &actual_size, &mode, &mtime, time(0) + options->active_timeout, &error_message);
timestamp_t stop = timestamp_get();

/* XXX actual_size should equal expected size, but only for a simple file, not a dir. */
Expand Down
0