Commit a36fc47a authored by Mitch Burnett's avatar Mitch Burnett
Browse files

Merge branch 'net-thread-improvements' into 'master'

Net thread mcnt processing logic improvements

See merge request !2
parents 85baeb62 f6a34f0c
......@@ -24,3 +24,4 @@ autom4te.cache
*.swp
hash/
standalone/
build/
......@@ -8,8 +8,12 @@ else
FLAG_DIR=$1
fi
# Change this to match your destination directory
prefix=/home/groups/flag/hash/
# Set default install location if prefix is not in environment
if [ -z ${prefix+x} ]
then
prefix=/home/groups/flag/hash
fi
echo "Installing FLAG to ${prefix}"
# Add prefix to PATH
if [[ "$PATH" == ?(*:)"$prefix/bin"?(:*) ]]
......@@ -75,8 +79,8 @@ echo "========================================"
echo "Navigating to $BEAM_SRC"
cd $BEAM_SRC
make clean
make
make install prefix=$prefix
make CUDA_DIR=$CUDA
make install prefix=$prefix CUDA_DIR=$CUDA
echo "========================================"
echo
......
......@@ -212,7 +212,7 @@ hashpipe_thread_run(void *vp_args)
}
}
if(args->obuf != NULL) {
printf("HASH: Clearning output buffer for %s ...\n", args->thread_desc->name);
printf("HASH: Clearing output buffer for %s ...\n", args->thread_desc->name);
hashpipe_databuf_clear(args->obuf);
}
pthread_cleanup_push((void *)hashpipe_databuf_detach, args->obuf);
......
......@@ -35,122 +35,106 @@ const int MAX_CMD_LEN = 64;
int open_fifo(char *fifo_loc)
{
int fifo_fd = open(fifo_loc, O_RDONLY | O_NONBLOCK);
if (fifo_fd<0)
{
fprintf(stderr, "vegas_fits_writer: Error opening control fifo %s\n", fifo_loc);
perror("open");
// exit(1);
}
fprintf(stderr, "FIFO created with fd: %d\n", fifo_fd);
return fifo_fd;
int fifo_fd = open(fifo_loc, O_RDONLY | O_NONBLOCK);
if (fifo_fd<0)
{
fprintf(stderr, "vegas_fits_writer: Error opening control fifo %s\n", fifo_loc);
perror("open");
// exit(1);
}
fprintf(stderr, "FIFO created with fd: %d\n", fifo_fd);
return fifo_fd;
}
cmd_t check_cmd(int fifo_fd)
{
// fprintf(stderr, "Checking FIFO with fd: %d\n", fifo_fd);
char cmd[MAX_CMD_LEN];
// fprintf(stderr, "fifo_fd: %d\n", fifo_fd);
struct pollfd pfd[1];
//pfd[1].fd = fifo_fd;
//pfd[1].events = POLLIN;
pfd[0].fd = fileno(stdin);
pfd[0].events = POLLIN;
// ?, num file desc, timeout
int rv = poll(pfd, 1, 0);
if (rv==0)
{
// fprintf(stderr, "rv == 0 :(\n");
return INVALID; //????
}
else if (rv<0)
{
if (errno!=EINTR)
{
perror("poll");
}
// fprintf(stderr, "rv < 0 :(\n");
return INVALID; //????
}
// printf("NEW COMMAND!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n");
// clear the command
memset(cmd, 0, MAX_CMD_LEN);
int i;
for (i=0; i<1; ++i)
{
rv = 0;
if (pfd[i].revents & POLLIN)
{
if (read(pfd[i].fd, cmd, MAX_CMD_LEN-1)<1)
{
// fprintf(stderr, "read failed :(\n");
// perror("read");
// return INVALID;
}
else
{
// fprintf(stderr, "read success :(\n");
rv = 1;
break;
}
}
}
if (pfd[0].revents==POLLHUP) {
//fprintf(stderr, "POLLHUP :(\n");
}
if (rv==0)
{
// fprintf(stderr, "rv == 0 again :(\n");
return INVALID;
}
else if (rv<0)
{
// fprintf(stderr, "rv < 0 again :(\n");
if (errno==EAGAIN)
{
return INVALID;
}
else
{
perror("read");
return INVALID;
}
}
// Truncate at newline
// TODO: allow multiple commands in one read?
char *ptr = strchr(cmd, '\n');
if (ptr!=NULL)
{
*ptr='\0';
}
// Process the command
if (strncasecmp(cmd,"START",MAX_CMD_LEN)==0)
{
return START;
}
else if (strncasecmp(cmd,"STOP",MAX_CMD_LEN)==0)
{
printf("FIFO: A STOP was issued to the hashpipe codes!!!!!!!!!!!!!!!!!!\n");
return STOP;
}
else if (strncasecmp(cmd,"QUIT",MAX_CMD_LEN)==0)
{
printf("FIFO: A QUIT was issued to the hashpipe codes!!!!!!!!!!!!!!!!!!\n");
return QUIT;
}
else
{
// Unknown command
return INVALID;
}
// fprintf(stderr, "Checking FIFO with fd: %d\n", fifo_fd);
char cmd[MAX_CMD_LEN];
// fprintf(stderr, "fifo_fd: %d\n", fifo_fd);
struct pollfd pfd[1];
//pfd[1].fd = fifo_fd;
//pfd[1].events = POLLIN;
pfd[0].fd = fileno(stdin);
pfd[0].events = POLLIN;
// ?, num file desc, timeout
int rv = poll(pfd, 1, 0);
if (rv==0) {
//fprintf(stderr, "rv == 0 :(\n");
return INVALID; //????
} else if (rv<0) {
if (errno!=EINTR) {
perror("poll");
}
//fprintf(stderr, "rv < 0 :(\n");
return INVALID; //????
}
// printf("NEW COMMAND!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n");
// clear the command
memset(cmd, 0, MAX_CMD_LEN);
int i;
for (i=0; i<1; ++i) {
rv = 0;
if (pfd[i].revents & POLLIN) {
if (read(pfd[i].fd, cmd, MAX_CMD_LEN-1)<1) {
//fprintf(stderr, "read failed :(\n");
//perror("read");
//return INVALID;
} else {
//fprintf(stderr, "read success :(\n");
rv = 1;
break;
}
}
}
if (pfd[0].revents==POLLHUP) {
//fprintf(stderr, "POLLHUP :(\n");
}
if (rv==0) {
//fprintf(stderr, "rv == 0 again :(\n");
return INVALID;
} else if (rv<0) {
//fprintf(stderr, "rv < 0 again :(\n");
if (errno==EAGAIN) {
return INVALID;
} else {
perror("read");
return INVALID;
}
}
// Truncate at newline
// TODO: allow multiple commands in one read?
char *ptr = strchr(cmd, '\n');
if (ptr!=NULL) {
*ptr='\0';
}
// Process the command
if (strncasecmp(cmd,"START",MAX_CMD_LEN)==0) {
return START;
} else if (strncasecmp(cmd,"STOP",MAX_CMD_LEN)==0) {
printf("FIFO: A STOP was issued to the hashpipe codes!!!!!!!!!!!!!!!!!!\n");
return STOP;
} else if (strncasecmp(cmd,"QUIT",MAX_CMD_LEN)==0) {
printf("FIFO: A QUIT was issued to the hashpipe codes!!!!!!!!!!!!!!!!!!\n");
return QUIT;
} else {
// Unknown command
return INVALID;
}
}
......@@ -49,7 +49,7 @@ static void * run(hashpipe_thread_args_t * args) {
while (run_threads()) {
// Wait for input buffer block to be filled
while ((rv=flag_gpu_beamformer_output_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
while ((rv=flag_gpu_beamformer_output_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) {
hashpipe_status_lock_safe(&st);
hputs(st.buf, status_key, "waiting for free block");
......@@ -71,8 +71,9 @@ static void * run(hashpipe_thread_args_t * args) {
hashpipe_status_unlock_safe(&st);
char filename[256];
sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s_mcnt_%lld.out", data_dir, BANK, (long long)start_mcnt);
fprintf(stderr, "Saving to %s\n", filename);
sprintf(filename, "/dev/null");
//sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s_mcnt_%lld.out", data_dir, BANK, (long long)start_mcnt);
fprintf(stderr, "Saving to %s mcnt: %lld\n", filename, (long long)start_mcnt);
//printf("RTBF: mcnt: %lld\n", (long long)start_mcnt);
if (SAVE) {
float * p = (float *)db_in->block[curblock_in].data;
......
......@@ -36,7 +36,7 @@ static void * run(hashpipe_thread_args_t * args) {
while (run_threads()) {
// Wait for input buffer block to be filled
while ((rv=flag_gpu_correlator_output_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
while ((rv=flag_gpu_correlator_output_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) {
hashpipe_status_lock_safe(&st);
hputs(st.buf, status_key, "waiting for free block");
......@@ -58,11 +58,12 @@ static void * run(hashpipe_thread_args_t * args) {
uint64_t start_mcnt = db_in->block[curblock_in].header.mcnt;
//uint64_t start_mcnt = db_in->block[curblock_in].header.mcnt;
//int64_t good_data = db_in->block[curblock_in].header.good_data;
char filename[256];
sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/cor_mcnt_%lld_%s.out", directory, (long long)start_mcnt, BANK);
fprintf(stderr, "SAV: Saving to %s\n", filename);
sprintf(filename, "/dev/null");
//sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/cor_mcnt_%lld_%s.out", directory, (long long)start_mcnt, BANK);
//fprintf(stderr, "SAV: Saving to %s\n", filename);
#if SAVE == 1
Complex * p = (Complex *)db_in->block[curblock_in].data;
......
......@@ -17,6 +17,14 @@ int flag_input_databuf_wait_filled(flag_input_databuf_t * d, int block_id) {
return hashpipe_databuf_wait_filled((hashpipe_databuf_t *)d, block_id);
}
int flag_input_databuf_busywait_free(flag_input_databuf_t * d, int block_id) {
return hashpipe_databuf_busywait_free((hashpipe_databuf_t *)d, block_id);
}
int flag_input_databuf_busywait_filled(flag_input_databuf_t * d, int block_id) {
return hashpipe_databuf_busywait_filled((hashpipe_databuf_t *)d, block_id);
}
int flag_input_databuf_set_free(flag_input_databuf_t * d, int block_id) {
return hashpipe_databuf_set_free((hashpipe_databuf_t *)d, block_id);
}
......
......@@ -119,6 +119,7 @@
// Number of packets per block
#define N_PACKETS_PER_BLOCK (N_BYTES_PER_BLOCK / N_BYTES_PER_PAYLOAD)
#define N_REAL_PACKETS_PER_BLOCK (N_REAL_BYTES_PER_BLOCK / N_BYTES_PER_PAYLOAD)
#define N_PACKETS_PER_BLOCK_PER_F (N_PACKETS_PER_BLOCK / N_FENGINES)
// Macro to compute data word offset for complex data word
#define Nm (N_TIME_PER_BLOCK/N_TIME_PER_PACKET) // Number of mcnts per block
......@@ -390,6 +391,8 @@ hashpipe_databuf_t * flag_input_databuf_create(int instance_id, int databuf_id);
int flag_input_databuf_wait_free (flag_input_databuf_t * d, int block_id);
int flag_input_databuf_wait_filled (flag_input_databuf_t * d, int block_id);
int flag_input_databuf_busywait_free (flag_input_databuf_t * d, int block_id);
int flag_input_databuf_busywait_filled (flag_input_databuf_t * d, int block_id);
int flag_input_databuf_set_free (flag_input_databuf_t * d, int block_id);
int flag_input_databuf_set_filled (flag_input_databuf_t * d, int block_id);
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment