Commit fc266e55 authored by Mitch Burnett's avatar Mitch Burnett Committed by GitHub
Browse files

Merge pull request #9 from mitchburnett/master

Updates from May commissioning
parents 376bb2e2 dafece46
......@@ -516,3 +516,41 @@ void run_beamformer(signed char * data_in, float * data_out) {
// cudaFree(d_data);
// cudaFree(d_outputs);
}
void rtbfCleanup() {
// Free up GPU memory at the end of a program
if (d_beamformed != NULL) {
cudaFree(d_beamformed);
}
if (d_data != NULL) {
cudaFree(d_data);
}
if (d_data1 != NULL) {
cudaFree(d_data1);
}
if (d_outputs != NULL) {
cudaFree(d_outputs);
}
if (d_weights != NULL) {
cudaFree(d_weights);
}
if (d_arr_A != NULL) {
cudaFree(d_arr_A);
}
if (d_arr_B != NULL) {
cudaFree(d_arr_B);
}
if (d_arr_C != NULL) {
cudaFree(d_arr_C);
}
// Free up and release cublas handle
cublasDestroy(handle);
}
......@@ -38,6 +38,7 @@ void bf_get_weight_filename(char * weight_filename);
long long unsigned int bf_get_xid();
void update_weights(char * filename);
void init_beamformer();
void rtbfCleanup();
void run_beamformer(signed char * data_in, float * data_out);
#ifdef __cplusplus
}
......
......@@ -196,6 +196,11 @@ hashpipe_thread_run(void *vp_args)
rv = THREAD_ERROR;
}
}
if(args->ibuf != NULL) {
printf("HASH: Clearning input buffer for %s ...\n", args->thread_desc->name);
hashpipe_databuf_clear(args->ibuf);
}
pthread_cleanup_push((void *)hashpipe_databuf_detach, args->ibuf);
if(args->thread_desc->obuf_desc.create) {
args->obuf = hashpipe_databuf_attach(args->instance_id, args->output_buffer);
......@@ -206,6 +211,10 @@ hashpipe_thread_run(void *vp_args)
rv = THREAD_ERROR;
}
}
if(args->obuf != NULL) {
printf("HASH: Clearning output buffer for %s ...\n", args->thread_desc->name);
hashpipe_databuf_clear(args->obuf);
}
pthread_cleanup_push((void *)hashpipe_databuf_detach, args->obuf);
......@@ -214,7 +223,9 @@ hashpipe_thread_run(void *vp_args)
// Call user run function
if(rv == THREAD_OK) {
printf("HASH: Making call to %s ...\n", args->thread_desc->name);
rv = args->thread_desc->run(args);
printf("HASH: Finished run function for %s ...\n", args->thread_desc->name);
}
// Set thread state to finished
......@@ -330,7 +341,7 @@ int main(int argc, char *argv[])
}
// Init thread
printf("initing thread '%s' with databufs %d and %d\n",
printf("HASH: Initing thread '%s' with databufs %d and %d\n",
args[num_threads].thread_desc->name, args[num_threads].input_buffer,
args[num_threads].output_buffer);
......@@ -462,9 +473,8 @@ int main(int argc, char *argv[])
// Start threads in reverse order
for(i=num_threads-1; i >= 0; i--) {
// Launch thread
printf("starting thread '%s' with databufs %d and %d\n",
printf("HASH: starting thread '%s' with databufs %d and %d\n",
args[i].thread_desc->name, args[i].input_buffer, args[i].output_buffer);
rv = pthread_create(&threads[i], NULL,
hashpipe_thread_run, (void *)&args[i]);
......@@ -482,16 +492,16 @@ int main(int argc, char *argv[])
while (run_threads()) {
sleep(1);
}
for(i=num_threads-1; i>=0; i--) {
pthread_cancel(threads[i]);
}
for(i=num_threads-1; i>=0; i--) {
pthread_kill(threads[i], SIGINT);
}
// MCB: To have each thread terminate itself should wait to have the thread clean themselves up before killing them.
// for(i=num_threads-1; i>=0; i--) {
// pthread_cancel(threads[i]);
// }
// for(i=num_threads-1; i>=0; i--) {
// pthread_kill(threads[i], SIGINT);
// }
for(i=num_threads-1; i>=0; i--) {
pthread_join(threads[i], NULL);
printf("Joined thread '%s'\n", args[i].thread_desc->name);
printf("HASH: Joined thread '%s'\n", args[i].thread_desc->name);
fflush(stdout);
}
for(i=num_threads; i>=0; i--) {
......
......@@ -155,8 +155,6 @@ void hashpipe_databuf_clear(hashpipe_databuf_t *d)
memset(arg.array, 0, sizeof(unsigned short)*d->n_block);
semctl(d->semid, 0, SETALL, arg);
free(arg.array);
// TODO memset to 0?
}
char *hashpipe_databuf_data(hashpipe_databuf_t *d, int block_id)
......
......@@ -49,7 +49,6 @@ hashpipe_databuf_t *hashpipe_databuf_attach(int instance_id, int databuf_id);
int hashpipe_databuf_detach(hashpipe_databuf_t *d);
/* Set all semaphores to 0,
* TODO: memset to 0 as well?
*/
void hashpipe_databuf_clear(hashpipe_databuf_t *d);
......
......@@ -81,7 +81,7 @@ int runPFB(signed char* inputData_h, float* outputData_h, params pfbParams) {
CUDASafeCallWithCleanUp(cudaGetLastError());
}
float2* fftOutPtr = g_pf2FFTOut_d;
//float2* fftOutPtr = g_pf2FFTOut_d;
while(!g_IsProcDone) {
//FFT
iRet = doFFT();
......@@ -116,7 +116,9 @@ int runPFB(signed char* inputData_h, float* outputData_h, params pfbParams) {
g_pf2FFTIn_d = g_pf2FFTIn_d -countFFT*g_iNumSubBands*g_iNFFT;
int outDataSize = countFFT * g_iNumSubBands * g_iNFFT;
CUDASafeCallWithCleanUp(cudaMemcpy(outputData_h, fftOutPtr, outDataSize*sizeof(cufftComplex), cudaMemcpyDeviceToHost));
//CUDASafeCallWithCleanUp(cudaMemcpy(outputData_h, fftOutPtr, outDataSize*sizeof(cufftComplex), cudaMemcpyDeviceToHost));
//printf("making sure new build...\n");
CUDASafeCallWithCleanUp(cudaMemcpy(outputData_h, g_pf2FFTOut_d, outDataSize*sizeof(cufftComplex), cudaMemcpyDeviceToHost));
return iRet;
......
......@@ -111,7 +111,7 @@ static void * run(hashpipe_thread_args_t * args) {
if(cur_state == ACQUIRE){
next_state = ACQUIRE;
// Wait for input buffer block to be filled
while ((rv=flag_gpu_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
while ((rv=flag_gpu_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) {
int cleanb;
hashpipe_status_lock_safe(&st);
......@@ -136,39 +136,44 @@ static void * run(hashpipe_thread_args_t * args) {
printf("RTB: Initializing beamformer weights...\n");
// update_weights(weight_file);
update_weights(w_dir);
// Put metadata into status shared memory
float offsets[BN_BEAM];
char cal_filename[65];
char algorithm[65];
char weight_filename[65];
long long unsigned int bf_xid;
int act_xid;
printf("RTB: Finished updating weights...\n");
// Put metadata into status shared memory
float offsets[BN_BEAM];
char cal_filename[65];
char algorithm[65];
char weight_filename[65];
long long unsigned int bf_xid;
int act_xid;
bf_get_offsets(offsets);
bf_get_cal_filename(cal_filename);
bf_get_algorithm(algorithm);
bf_get_weight_filename(weight_filename);
bf_xid = bf_get_xid();
printf("RTBF: setting offsets...\n");
bf_get_offsets(offsets);
printf("RTBF: getting cal filename...\n");
bf_get_cal_filename(cal_filename);
printf("RTBF: getting algorithm...\n");
bf_get_algorithm(algorithm);
printf("RTBF: getting weight filename...\n");
bf_get_weight_filename(weight_filename);
bf_xid = bf_get_xid();
int i;
hashpipe_status_lock_safe(&st);
for (i = 0; i < BN_BEAM/2; i++) {
char keyword1[9];
snprintf(keyword1,8,"ELOFF%d",i);
hputr4(st.buf, keyword1, offsets[2*i]);
char keyword2[9];
snprintf(keyword2,8,"AZOFF%d",i);
hputr4(st.buf, keyword2, offsets[2*i+1]);
}
hputs(st.buf, "BCALFILE", cal_filename);
hputs(st.buf, "BALGORIT", algorithm);
hputs(st.buf, "BWFILE", weight_filename);
hgeti4(st.buf, "XID", &act_xid);
hashpipe_status_unlock_safe(&st);
hashpipe_status_lock_safe(&st);
hputs(st.buf,"WFLAG","0");
hashpipe_status_unlock_safe(&st);
int i;
hashpipe_status_lock_safe(&st);
for (i = 0; i < BN_BEAM/2; i++) {
char keyword1[9];
snprintf(keyword1,8,"ELOFF%d",i);
hputr4(st.buf, keyword1, offsets[2*i]);
char keyword2[9];
snprintf(keyword2,8,"AZOFF%d",i);
hputr4(st.buf, keyword2, offsets[2*i+1]);
}
hputs(st.buf, "BCALFILE", cal_filename);
hputs(st.buf, "BALGORIT", algorithm);
hputs(st.buf, "BWFILE", weight_filename);
hgeti4(st.buf, "XID", &act_xid);
hashpipe_status_unlock_safe(&st);
hashpipe_status_lock_safe(&st);
hputs(st.buf,"WFLAG","0");
hashpipe_status_unlock_safe(&st);
}
}
else {
......@@ -177,6 +182,7 @@ static void * run(hashpipe_thread_args_t * args) {
break;
}
}
if (!run_threads()) break;
// If CLEANUP, don't continue processing
if (next_state != CLEANUP) {
......@@ -212,6 +218,7 @@ static void * run(hashpipe_thread_args_t * args) {
if ((float) tval_result.tv_usec/1000 > 13) {
printf("RTBF: Warning!!!!!!!!! Time = %f ms\n", (float) tval_result.tv_usec/1000);
}
check_count++;
// if(check_count == 1000){
// }
......@@ -241,7 +248,7 @@ static void * run(hashpipe_thread_args_t * args) {
hashpipe_status_lock_safe(&st);
hputl(st.buf, "CLEANB",1);
hashpipe_status_unlock_safe(&st);
printf("BF: Finished CLEANUP, returning to ACQUIRE\n");
printf("RTBF: Finished CLEANUP, returning to ACQUIRE\n");
}
......@@ -257,6 +264,11 @@ static void * run(hashpipe_thread_args_t * args) {
}
// Thread terminates after loop
hashpipe_status_lock_busywait_safe(&st);
printf("RTBF: Cleaning up gpu context...\n");
rtbfCleanup();
hputs(st.buf, status_key, "terminated");
hashpipe_status_unlock_safe(&st);
return NULL;
}
......
......@@ -71,8 +71,9 @@ static void * run(hashpipe_thread_args_t * args) {
hashpipe_status_unlock_safe(&st);
char filename[256];
sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s_mcnt_%lld.out", data_dir, BANK, (long long)start_mcnt);
fprintf(stderr, "Saving to %s\n", filename);
//sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s_mcnt_%lld.out", data_dir, BANK, (long long)start_mcnt);
//fprintf(stderr, "Saving to %s\n", filename);
printf("RTBF: mcnt: %lld\n", (long long)start_mcnt);
if (SAVE) {
float * p = (float *)db_in->block[curblock_in].data;
FILE * filePtr = fopen(filename, "w");
......
......@@ -92,7 +92,7 @@ static void * run(hashpipe_thread_args_t * args) {
if (cur_state == ACQUIRE) {
next_state = ACQUIRE;
// Wait for input buffer block to be filled
while ((rv=flag_gpu_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
while ((rv=flag_gpu_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) {
int cleanb;
hashpipe_status_lock_safe(&st);
......@@ -111,6 +111,7 @@ static void * run(hashpipe_thread_args_t * args) {
break;
}
}
if(!run_threads()) break;
if (next_state != CLEANUP) {
// Print out the header information for this block
......@@ -189,10 +190,10 @@ static void * run(hashpipe_thread_args_t * args) {
}
}
// Check to see if a stop is issued
if (strcmp(integ_status, "stop") == 0) {
continue;
}
// Check to see if a stop is issued
if (strcmp(integ_status, "stop") == 0) {
continue;
}
// If we get here, then integ_status == "on"
// Setup for current chunk
......@@ -204,7 +205,7 @@ static void * run(hashpipe_thread_args_t * args) {
doDump = 1;
// Wait for new output block to be free
while ((rv=flag_gpu_correlator_output_databuf_wait_free(db_out, curblock_out)) != HASHPIPE_OK) {
while ((rv=flag_gpu_correlator_output_databuf_wait_free(db_out, curblock_out)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) {
int cleanb;
hashpipe_status_lock_safe(&st);
......@@ -290,6 +291,11 @@ static void * run(hashpipe_thread_args_t * args) {
}
// Thread terminates after loop
hashpipe_status_lock_busywait_safe(&st);
printf("COR: Cleaning gpu context...\n");
xgpuFree(&context);
hputs(st.buf, status_key, "terminated");
hashpipe_status_unlock_safe(&st);
return NULL;
}
......
......@@ -143,9 +143,12 @@
// Macros specific to the fine-channel correlator (PFB correlator)
#define N_TIME_PER_PFB_BLOCK XGPU_PFB_NTIME
#define N_CHAN_PER_PFB_BLOCK XGPU_PFB_NFREQUENCY
#define N_CHAN_PFB_SELECTED 5
#define N_PFB_COR_MATRIX (N_INPUTS/2*(N_INPUTS/2 + 1)/2*N_CHAN_PER_PFB_BLOCK*4)
#define N_BYTES_PER_PFB_BLOCK (N_TIME_PER_BLOCK * N_CHAN_PER_PFB_BLOCK * N_INPUTS * N_BITS_IQ * 2 / 8)
#define flag_pfb_gpu_input_databuf_idx(m,f,t,c) ((2*N_INPUTS_PER_FENGINE/sizeof(uint64_t))*(f+Nf*(c+N_CHAN_PFB_SELECTED*(t+Nt*m))))
// Macros to maintain cache alignment
#define CACHE_ALIGNMENT (128)
typedef uint8_t hashpipe_databuf_cache_alignment[
......@@ -158,7 +161,8 @@ typedef uint8_t hashpipe_databuf_cache_alignment[
* It is the output buffer of the flag_net_thread.
* It is the input buffer of the flag_transpose_thread.
*/
#define N_INPUT_BLOCKS 105
#define N_INPUT_BLOCKS 100
// A typedef for a block header
typedef struct flag_input_header {
......
......@@ -259,7 +259,7 @@ static void set_block_filled(flag_input_databuf_t * db, block_info_t * binfo) {
// (2) block population (output buffer data type is a block)
// (3) buffer population (if block is filled)
static inline int64_t process_packet(flag_input_databuf_t * db, struct hashpipe_udp_packet *p) {
packet_header_t pkt_header;
packet_header_t pkt_header;
// Initialize block information data types
if (!binfo.initialized) {
......@@ -456,6 +456,7 @@ static void *run(hashpipe_thread_args_t * args) {
hashpipe_status_unlock_safe(&st);
struct hashpipe_udp_packet p;
//struct hashpipe_udp_packet bh; // blackhole packet to suck up data that isnt processed
/* Give all the threads a chance to start before opening network socket */
/*
......@@ -586,8 +587,13 @@ static void *run(hashpipe_thread_args_t * args) {
************************************************************/
// If in IDLE state, look for START command
if (cur_state == IDLE) {
// cmd = check_cmd(gpu_fifo_id);
// cmd = check_cmd(gpu_fifo_id);
// keep receiving packets but send them to a blackhole packet, these wont be processed
//bh.packet_size = recv(up.sock, bh.data, HASHPIPE_MAX_PACKET_SIZE, 0);
//if(bh.packet_size != -1) {
//printf("blackhole!!!\n");
//}
// If command is START, proceed to ACQUIRE state
if (master_cmd == START) {
......@@ -648,7 +654,7 @@ static void *run(hashpipe_thread_args_t * args) {
int cleanA = 1;
int cleanB = 1;
int cleanC = 1;
printf("NET: CLEANUP condition met!\n");
printf("NET: CLEANUP condition met!\n");
sleep(1);
printf("NET: Informing other threads of cleanup condition\n");
while (cleanA != 0 && cleanB != 0 && cleanC != 0) {
......@@ -732,8 +738,8 @@ static void *run(hashpipe_thread_args_t * args) {
}
pthread_cleanup_pop(1); /* Closes push(hashpipe_udp_close) */
hashpipe_status_lock_busywait_safe(&st);
printf("NET: Exiting thread loop...\n");
hputs(st.buf, status_key, "terminated");
hashpipe_status_unlock_safe(&st);
return NULL;
......
......@@ -114,7 +114,7 @@ static void * run(hashpipe_thread_args_t * args) {
if (cur_state == ACQUIRE) {
next_state = ACQUIRE;
// Wait for input buffer block to be filled
while ((rv=flag_gpu_pfb_output_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
while ((rv=flag_gpu_pfb_output_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) {
int cleanc;
hashpipe_status_lock_safe(&st);
......@@ -133,6 +133,7 @@ static void * run(hashpipe_thread_args_t * args) {
break;
}
}
if (!run_threads()) break;
if (next_state != CLEANUP) {
// Print out the header information for this block
......@@ -329,6 +330,11 @@ static void * run(hashpipe_thread_args_t * args) {
}
// Thread terminates after loop
hashpipe_status_lock_busywait_safe(&st);
printf("COR: Cleaning up gpu context...\n");
xgpuFree(&context);
hputs(st.buf, status_key, "terminated");
hashpipe_status_unlock_safe(&st);
return NULL;
}
......
......@@ -88,21 +88,20 @@ static void * run(hashpipe_thread_args_t * args) {
if(cur_state == ACQUIRE){
next_state = ACQUIRE;
// Wait for input buffer block to be filled
while ((rv=flag_pfb_gpu_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
// Take this time to update CHANSEL
int chk_chanSel = 0;
hashpipe_status_lock_safe(&st);
hgeti4(st.buf, "CHANSEL", &chk_chanSel);
hashpipe_status_unlock_safe(&st);
if( (chanSel - chk_chanSel) != 0) {
printf("PFB: Channel Selection detected. Switching channel...\n");
pfbParams.select = chk_chanSel;
chanSel = chk_chanSel;
hashpipe_status_lock_safe(&st);
hputi4(st.buf, "CHANSEL", chanSel);
hashpipe_status_unlock_safe(&st);
}
while ((rv=flag_pfb_gpu_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
// Take this time to update CHANSEL
int chk_chanSel = 0;
hashpipe_status_lock_safe(&st);
hgeti4(st.buf, "CHANSEL", &chk_chanSel);
hashpipe_status_unlock_safe(&st);
if( (chanSel - chk_chanSel) != 0) {
printf("PFB: Channel Selection detected. Switching channel...\n");
pfbParams.select = chk_chanSel;
chanSel = chk_chanSel;
hashpipe_status_lock_safe(&st);
hputi4(st.buf, "CHANSEL", chanSel);
hashpipe_status_unlock_safe(&st);
}
if (rv==HASHPIPE_TIMEOUT) {
int cleanb;
......@@ -121,6 +120,7 @@ static void * run(hashpipe_thread_args_t * args) {
break;
}
}
if (!run_threads()) break;
// Print out the header information for this block
flag_gpu_input_header_t tmp_header;
......@@ -197,6 +197,11 @@ static void * run(hashpipe_thread_args_t * args) {
}
// Thread terminates after loop
hashpipe_status_lock_busywait_safe(&st);
printf("PFB: Cleaning up gpu context...\n");
cleanUp(); // pfb cleanup method. Should redefine in the future as a different function i.e pfbFree().
hputs(st.buf, status_key, "terminated");
hashpipe_status_unlock_safe(&st);
return NULL;
}
......@@ -205,7 +210,7 @@ static hashpipe_thread_desc_t f_thread = {
name: "flag_pfb_thread",
skey: "PFBSTAT",
init: NULL,
run: run,
run: run,
ibuf_desc: {flag_pfb_gpu_input_databuf_create},
obuf_desc: {flag_gpu_pfb_output_databuf_create}
};
......
......@@ -63,7 +63,7 @@ static void * run(hashpipe_thread_args_t * args) {
if (cur_state == ACQUIRE) {
next_state = ACQUIRE;
// Wait for input buffer block to be filled
while ((rv=flag_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
while ((rv=flag_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) { // If we are waiting for an input block...
// Check to see if network thread is in cleanup
hashpipe_status_lock_safe(&st);
......@@ -81,11 +81,12 @@ static void * run(hashpipe_thread_args_t * args) {
break;
}
}
if (!run_threads()) break;
if (next_state != CLEANUP) {
// Wait for output buffer block to be freed
while ((rv=flag_pfb_gpu_input_databuf_wait_free(db_out, curblock_out)) != HASHPIPE_OK) {
while ((rv=flag_pfb_gpu_input_databuf_wait_free(db_out, curblock_out)) != HASHPIPE_OK && run_threads()) {
if (rv == HASHPIPE_TIMEOUT) {
//hashpipe_status_lock_safe(&st);
//hputs(st.buf, status_key, "waiting for free block");
......@@ -98,6 +99,7 @@ static void * run(hashpipe_thread_args_t * args) {
break;
}
}
if (!run_threads()) break;
// Print out the header information for this block
flag_input_header_t tmp_header;
......@@ -116,8 +118,8 @@ static void * run(hashpipe_thread_args_t * args) {
hashpipe_status_lock_safe(&st);
hgeti4(st.buf, "CHANSEL", &n_chunk);
hashpipe_status_unlock_safe(&st);
int c_start = n_chunk*N_CHAN_PER_FRB_BLOCK;
int c_end = c_start + N_CHAN_PER_FRB_BLOCK;
int c_start = n_chunk*N_CHAN_PFB_SELECTED;
int c_end = c_start + N_CHAN_PFB_SELECTED;
/**********************************************
* Perform transpose
......@@ -127,20 +129,19 @@ static void * run(hashpipe_thread_args_t * args) {
uint64_t * in_p;
uint64_t * out_p;
uint64_t * block_in_p = db_in->block[curblock_in].data;
uint64_t * block_out_p = db_out->block[curblock_out].data;
uint64_t * block_out_p = db_out->block[curblock_out].data;
for (m = 0; m < Nm; m++) {
for (t = 0; t < Nt; t++) {
for (f = 0; f < Nf; f++) {
for (c = c_start; c < c_end; c++) {
// for (c = 0; c < Nc; c++) {
in_p = block_in_p + flag_input_databuf_idx(m,f,t,c);
out_p = block_out_p + flag_gpu_input_databuf_idx(m,f,t,c % N_CHAN_PER_FRB_BLOCK);
out_p = block_out_p + flag_pfb_gpu_input_databuf_idx(m,f,t,c % N_CHAN_PFB_SELECTED);
memcpy(out_p, in_p, 128/8);
}
}
}
}
// Mark block as filled
flag_pfb_gpu_input_databuf_set_filled(db_out, curblock_out);
......@@ -177,6 +178,11 @@ static void * run(hashpipe_thread_args_t * args) {
}
// Thread terminates after loop
hashpipe_status_lock_busywait_safe(&st);
printf("TRA: Exiting thread loop...\n");
hputs(st.buf, status_key, "terminated");
hashpipe_status_unlock_safe(&st);
return NULL;
}
......
......@@ -58,7 +58,7 @@ static void * run(hashpipe_thread_args_t * args) {
if (cur_state == ACQUIRE) {
next_state = ACQUIRE;
// Wait for input buffer block to be filled
while ((rv=flag_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
while ((rv=flag_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) { // If we are waiting for an input block...
// Check to see if network thread is in cleanup
hashpipe_status_lock_safe(&st);
......@@ -77,12 +77,13 @@ static void * run(hashpipe_thread_args_t * args) {
break;
}
}
if (!run_threads()) break;
//printf("TRA: Rx %lld, curblock_in %d\n", (long long int)db_in->block[curblock_in].header.mcnt_start, curblock_in);
if (next_state != CLEANUP) {
// Wait for output buffer block to be freed