Commit f6ad7ca3 authored by Mark Ruzindana's avatar Mark Ruzindana
Browse files

Merging changes made to include transpose logic in net thread and remove...

Merging changes made to include transpose logic in net thread and remove transpose thread from HASHPIPE pipeline. The file that should be called in place of flag_net_thread.c in the pipeline is flag_net_transpose_thread.c and instead of flag_correlator_thread.c, use flag_corr_transpose_thread.c. These are specified in Makefile.am.
parents a36fc47a 0c62daf1
......@@ -44,7 +44,7 @@ sigma2 = kb*Tsys*BW; % Noise power per channel
% 8 -> Send exponentially correlated noise.
% 9 -> Send pulsar data
% else -> Send all zeros
data_flag = 9;
data_flag = 5;
% Sinusoid parameters (only used if data_flag = 2)
% It should be noted that the phase of the sinusoid will not change between
......@@ -262,7 +262,7 @@ mcnt = 0; % Each mcnt represents 20 packets across all F-engines in the
% same time frame
% Regular coarse and fine correlator %%%%%%%%%%%%%%%%%%%%%%%%%%%
for mcnt = [0:401,800,1200,1600,2000] % [0:801,1200,1600,2000,2400] % No scalloping fix %while mcnt <= 10000
for mcnt = [0:3600] %,800,1200,1600,2000] % [0:801,1200,1600,2000,2400] % No scalloping fix %while mcnt <= 10000
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Scalloping fix fine correlator %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% for mcnt = [0:1601,2400,3200,4000,4800]
......
......@@ -57,7 +57,7 @@ void update_weights(char * filename){
//weights_dc = (float complex *)malloc(BN_WEIGHTS*sizeof(float complex *));
//weights_dc_n = (float complex *)malloc(BN_WEIGHTS*sizeof(float complex *));
// For pinned memory ////////////////////////////////////////////////////
// Weights doesn't need pinned memory implemented because it is less than 16 MB ///////////
// Weights don't need pinned memory implemented because it is less than 16 MB ///////////
cudaMallocHost((void **)&bf_weights,2*BN_WEIGHTS*sizeof(float complex *));
cudaMallocHost((void **)&weights_dc,BN_WEIGHTS*sizeof(float complex *));
cudaMallocHost((void **)&weights_dc_n,BN_WEIGHTS*sizeof(float complex *));
......@@ -728,7 +728,6 @@ void rtbfCleanup() {
if (d_data1 != NULL) {
cudaFree(d_data1);
//cudaFreeHost(d_data1); // Clean up pinned memory (use cudaFreeHost() with cudaMallocHost())
}
if (d_data2 != NULL) {
......@@ -741,7 +740,6 @@ void rtbfCleanup() {
if (d_weights != NULL) {
cudaFree(d_weights);
//cudaFreeHost(d_weights); // Clean up pinned memory (use cudaFreeHost() with cudaMallocHost())
}
if (d_arr_A != NULL) {
......
......@@ -57,7 +57,7 @@ void update_weights(char * filename){
//weights_dc = (float complex *)malloc(BN_WEIGHTS*sizeof(float complex *));
//weights_dc_n = (float complex *)malloc(BN_WEIGHTS*sizeof(float complex *));
// For pinned memory ////////////////////////////////////////////////////
// Weights doesn't need pinned memory implemented because it is less than 16 MB ///////////
// Weights don't need pinned memory implemented because it is less than 16 MB ///////////
cudaMallocHost((void **)&bf_weights,2*BN_WEIGHTS*sizeof(float complex *));
cudaMallocHost((void **)&weights_dc,BN_WEIGHTS*sizeof(float complex *));
cudaMallocHost((void **)&weights_dc_n,BN_WEIGHTS*sizeof(float complex *));
......@@ -728,7 +728,6 @@ void rtbfCleanup() {
if (d_data1 != NULL) {
cudaFree(d_data1);
//cudaFreeHost(d_data1); // Clean up pinned memory (use cudaFreeHost() with cudaMallocHost())
}
if (d_data2 != NULL) {
......@@ -741,7 +740,6 @@ void rtbfCleanup() {
if (d_weights != NULL) {
cudaFree(d_weights);
//cudaFreeHost(d_weights); // Clean up pinned memory (use cudaFreeHost() with cudaMallocHost())
}
if (d_arr_A != NULL) {
......
......@@ -69,6 +69,8 @@ static void *run(hashpipe_thread_args_t * args)
hputi4(st.buf, "NULBLKIN", block_idx);
hashpipe_status_unlock_safe(&st);
fprintf(stderr, "Writing to /dev/null\n");
// Mark block as free
hashpipe_databuf_set_free(db, block_idx);
......
#define XGPU_VERSION 0.1+182@g6ce42de-dirty
#define XGPU_VERSION 0.1+184@g933d475-dirty
......@@ -36,9 +36,15 @@ fifo_codes = fifo.h \
flag_databuf = flag_databuf.h \
flag_databuf.c
flag_x_threads = flag_net_thread.c \
flag_transpose_thread.c \
flag_correlator_thread.c \
# Separate transpose thread
# flag_x_threads = flag_net_thread.c \
# flag_transpose_thread.c \
# flag_correlator_thread.c \
# flag_corsave_thread.c
# Combined Net & Transpose
flag_x_threads = flag_net_transpose_thread.c \
flag_corr_transpose_thread.c \
flag_corsave_thread.c
# CPU Transpose
......@@ -67,12 +73,19 @@ flag_x_frb_threads = flag_net_thread.c \
flag_frb_correlator_thread.c \
flag_frb_corsave_thread.c
# Separate transpose thread
flag_fx_threads = flag_net_thread.c \
flag_pfb_transpose_thread.c \
flag_pfb_thread.c \
flag_pfb_correlator_thread.c \
flag_pfb_corsave_thread.c
# Combined Net & Transpose
#flag_fx_threads = flag_net_pfb_thread.c \
# flag_pfb2_thread.c \
# flag_pfb_corr_transpose_thread.c \
# flag_pfb_corsave_thread.c
# This lists all of the plugins that will be created
lib_LTLIBRARIES = flag_x.la flag_b.la flag_f.la flag_x_frb.la flag_bx.la flag_fx.la
......
......@@ -131,7 +131,6 @@ cmd_t check_cmd(int fifo_fd)
} else if (strncasecmp(cmd,"QUIT",MAX_CMD_LEN)==0) {
printf("FIFO: A QUIT was issued to the hashpipe codes!!!!!!!!!!!!!!!!!!\n");
return QUIT;
} else {
// Unknown command
return INVALID;
......
......@@ -9,6 +9,7 @@
#include <pthread.h>
#include <string.h>
#include <time.h>
#include "hashpipe.h"
#include "flag_databuf.h"
#include <xgpu.h>
......@@ -46,6 +47,19 @@ static void * run(hashpipe_thread_args_t * args) {
int rv;
int curblock_in = 0;
// Modified to append to a file rather than overwriting to a file ////////////////////
//char BANK[5];
//hashpipe_status_lock_safe(&st);
//hgets(st.buf, "DATADIR", 127, data_dir);
//hgets(st.buf, "BANKNAM", 4, BANK);
//hashpipe_status_unlock_safe(&st);
//char filename[256];
//sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s.out", data_dir, BANK);
//FILE * filePtr = fopen(filename, "a");
////////////////////////////////////////////////////////////////////////////////////
while (run_threads()) {
// Wait for input buffer block to be filled
......@@ -78,9 +92,16 @@ static void * run(hashpipe_thread_args_t * args) {
if (SAVE) {
float * p = (float *)db_in->block[curblock_in].data;
FILE * filePtr = fopen(filename, "w");
//FILE * filePtr = fopen(filename, "a");
fwrite(p, sizeof(float), N_BEAM_SAMPS, filePtr);
fwrite(&good_data, sizeof(int), 1, filePtr);
fclose(filePtr);
//struct timespec ts = {0, 1000}; // One microsecond
//int max_tries = 380000; // One million microseconds
//int i;
//for(i = 0; i < max_tries; i++) {
// nanosleep(&ts, NULL);
//}
}
// Mark input block as free and wait for next block
......@@ -90,6 +111,7 @@ static void * run(hashpipe_thread_args_t * args) {
// Check if program killed
pthread_testcancel();
}
//fclose(filePtr);
// Thread terminates after loop
return NULL;
......
/* flag_beamsave_thread.c
*
* Routine to save total power outputs to file for data verification
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
#include "hashpipe.h"
#include "flag_databuf.h"
#include <xgpu.h>
// #include "total_power.h"
// Create thread status buffer
static hashpipe_status_t * st_p;
// Run method for the thread
static void * run(hashpipe_thread_args_t * args) {
// Local aliases to shorten access to args fields
flag_gpu_beamformer_output_databuf_t * db_in = (flag_gpu_beamformer_output_databuf_t *)args->ibuf;
hashpipe_status_t st = args->st;
const char * status_key = args->thread_desc->skey;
st_p = &st; // allow global (this source file) access to the status buffer
//int instance_id = args[0].instance_id;
char data_dir[128];
hashpipe_status_lock_safe(&st);
hgets(st.buf, "DATADIR", 127, data_dir);
hashpipe_status_unlock_safe(&st);
if (data_dir == NULL) {
printf("SAV: DATADIR = .\n");
}
else {
printf("SAV: DATADIR = %s\n", data_dir);
}
// Mark thread as ready to run
hashpipe_status_lock_safe(&st);
hputi4(st.buf, "SAVEREADY", 1);
hashpipe_status_unlock_safe(&st);
int rv;
int curblock_in = 0;
// Modified to append to a file rather than overwriting to a file ////////////////////
char BANK[5];
hashpipe_status_lock_safe(&st);
hgets(st.buf, "DATADIR", 127, data_dir);
hgets(st.buf, "BANKNAM", 4, BANK);
hashpipe_status_unlock_safe(&st);
char filename[256];
sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s.out", data_dir, BANK);
FILE * filePtr = fopen(filename, "a");
////////////////////////////////////////////////////////////////////////////////////
while (run_threads()) {
// Wait for input buffer block to be filled
while ((rv=flag_gpu_beamformer_output_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
if (rv==HASHPIPE_TIMEOUT) {
hashpipe_status_lock_safe(&st);
hputs(st.buf, status_key, "waiting for free block");
hashpipe_status_unlock_safe(&st);
}
else {
hashpipe_error(__FUNCTION__, "error waiting for filled databuf block");
pthread_exit(NULL);
break;
}
}
uint64_t start_mcnt = db_in->block[curblock_in].header.mcnt;
int good_data = (int)(db_in->block[curblock_in].header.good_data);
//char BANK[5];
//hashpipe_status_lock_safe(&st);
//hgets(st.buf, "DATADIR", 127, data_dir);
//hgets(st.buf, "BANKNAM", 4, BANK);
//hashpipe_status_unlock_safe(&st);
//char filename[256];
//sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s_mcnt_%lld.out", data_dir, BANK, (long long)start_mcnt);
//fprintf(stderr, "Saving to %s\n", filename);
// //sprintf(filename, "/dev/null");
//sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s.out", data_dir, BANK);
fprintf(stderr, "Saving to %s at mcnt %lld\n", filename, (long long)start_mcnt);
//printf("RTBF: mcnt: %lld\n", (long long)start_mcnt);
if (SAVE) {
float * p = (float *)db_in->block[curblock_in].data;
//FILE * filePtr = fopen(filename, "w");
//FILE * filePtr = fopen(filename, "a");
fwrite(p, sizeof(float), N_BEAM_SAMPS, filePtr);
fwrite(&good_data, sizeof(int), 1, filePtr);
//fclose(filePtr);
//struct timespec ts = {0, 1000}; // One microsecond
//int max_tries = 380000; // One million microseconds
//int i;
//for(i = 0; i < max_tries; i++) {
// nanosleep(&ts, NULL);
//}
}
// Mark input block as free and wait for next block
flag_gpu_beamformer_output_databuf_set_free(db_in, curblock_in);
curblock_in = (curblock_in + 1) % db_in->header.n_block;
// Check if program killed
pthread_testcancel();
}
fclose(filePtr);
// Thread terminates after loop
return NULL;
}
// Thread description
static hashpipe_thread_desc_t bsave_thread = {
name: "flag_beamsave_thread",
skey: "BEAMSAVE",
init: NULL,
run: run,
ibuf_desc: {flag_gpu_beamformer_output_databuf_create},
obuf_desc: {NULL}
};
static __attribute__((constructor)) void ctor() {
register_hashpipe_thread(&bsave_thread);
}
/* flag_beamsave_thread.c
*
* Routine to save total power outputs to file for data verification
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include "hashpipe.h"
#include "flag_databuf.h"
#include <xgpu.h>
// #include "total_power.h"
// Create thread status buffer
static hashpipe_status_t * st_p;
// Run method for the thread
static void * run(hashpipe_thread_args_t * args) {
// Local aliases to shorten access to args fields
flag_gpu_beamformer_output_databuf_t * db_in = (flag_gpu_beamformer_output_databuf_t *)args->ibuf;
hashpipe_status_t st = args->st;
const char * status_key = args->thread_desc->skey;
st_p = &st; // allow global (this source file) access to the status buffer
//int instance_id = args[0].instance_id;
char data_dir[128];
hashpipe_status_lock_safe(&st);
hgets(st.buf, "DATADIR", 127, data_dir);
hashpipe_status_unlock_safe(&st);
if (data_dir == NULL) {
printf("SAV: DATADIR = .\n");
}
else {
printf("SAV: DATADIR = %s\n", data_dir);
}
// Mark thread as ready to run
hashpipe_status_lock_safe(&st);
hputi4(st.buf, "SAVEREADY", 1);
hashpipe_status_unlock_safe(&st);
int rv;
int curblock_in = 0;
while (run_threads()) {
// Wait for input buffer block to be filled
while ((rv=flag_gpu_beamformer_output_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK) {
if (rv==HASHPIPE_TIMEOUT) {
hashpipe_status_lock_safe(&st);
hputs(st.buf, status_key, "waiting for free block");
hashpipe_status_unlock_safe(&st);
}
else {
hashpipe_error(__FUNCTION__, "error waiting for filled databuf block");
pthread_exit(NULL);
break;
}
}
uint64_t start_mcnt = db_in->block[curblock_in].header.mcnt;
int good_data = (int)(db_in->block[curblock_in].header.good_data);
char BANK[5];
hashpipe_status_lock_safe(&st);
hgets(st.buf, "DATADIR", 127, data_dir);
hgets(st.buf, "BANKNAM", 4, BANK);
hashpipe_status_unlock_safe(&st);
char filename[256];
//sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s_mcnt_%lld.out", data_dir, BANK, (long long)start_mcnt);
//fprintf(stderr, "Saving to %s\n", filename);
//sprintf(filename, "/dev/null");
sprintf(filename, "%s/TGBT16A_508_01/TMP/BF/beamformer_%s.out", data_dir, BANK);
fprintf(stderr, "Saving to %s at mcnt %lld\n", filename, (long long)start_mcnt);
//printf("RTBF: mcnt: %lld\n", (long long)start_mcnt);
if (SAVE) {
float * p = (float *)db_in->block[curblock_in].data;
FILE * filePtr = fopen(filename, "w");
fwrite(p, sizeof(float), N_BEAM_SAMPS, filePtr);
fwrite(&good_data, sizeof(int), 1, filePtr);
fclose(filePtr);
}
// Mark input block as free and wait for next block
flag_gpu_beamformer_output_databuf_set_free(db_in, curblock_in);
curblock_in = (curblock_in + 1) % db_in->header.n_block;
// Check if program killed
pthread_testcancel();
}
// Thread terminates after loop
return NULL;
}
// Thread description
static hashpipe_thread_desc_t bsave_thread = {
name: "flag_beamsave_thread",
skey: "BEAMSAVE",
init: NULL,
run: run,
ibuf_desc: {flag_gpu_beamformer_output_databuf_create},
obuf_desc: {NULL}
};
static __attribute__((constructor)) void ctor() {
register_hashpipe_thread(&bsave_thread);
}
/* flag_correlator_thread.c
*
* Routine to correlate received packets
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <xgpu.h>
#include "hashpipe.h"
#include "flag_databuf.h"
// Create thread status buffer
static hashpipe_status_t * st_p;
// Enumerated types for flag_transpose_thread state machine
typedef enum {
ACQUIRE,
CLEANUP
} state;
// Run method for the thread
// It is meant to do the following:
// (1) Initialize status buffer
// (2) Start main loop
// (2a) Wait for input buffer block to be filled
// (2b) Print out some data in the block
static void * run(hashpipe_thread_args_t * args) {
// Local aliases to shorten access to args fields
flag_input_databuf_t * db_in = (flag_input_databuf_t *)args->ibuf;
flag_gpu_correlator_output_databuf_t * db_out = (flag_gpu_correlator_output_databuf_t *)args->obuf;
hashpipe_status_t st = args->st;
const char * status_key = args->thread_desc->skey;
st_p = &st; // allow global (this source file) access to the status buffer
// Initialize correlator integrator status to "off"
// Initialize starting mcnt to 0 (INTSYNC)
char integ_status[17];
int gpu_dev = 0;
hashpipe_status_lock_safe(&st);
hputs(st.buf, "INTSTAT", "off");
hputi8(st.buf, "INTSYNC", 0);
hputr4(st.buf, "REQSTI", 0.5); // Requested STI length (set by Dealer/Player)
hputr4(st.buf, "ACTSTI", 0.0); // Delivered (actual) STI length (based on whole number of blocks)
hputi4(st.buf, "INTCOUNT", 1); // Number of blocks to integrate per STI
hgeti4(st.buf, "GPUDEV", &gpu_dev);
hputi4(st.buf, "GPUDEV", gpu_dev);
hashpipe_status_unlock_safe(&st);
// Initialize xGPU context structure
// Comment from PAPER:
// Initialize context to point at first input and output memory blocks.
// This seems redundant since we do this just before calling
// xgpuCudaXengine, but we need to pass something in for array_h and
// matrix_x to prevent xgpuInit from allocating memory.
XGPUContext context;
context.array_h = (ComplexInput *)db_in->block[0].data;
context.matrix_h = (Complex *)db_out->block[0].data;
context.array_len = (db_in->header.n_block * sizeof(flag_input_block_t) - sizeof(flag_input_header_t))/sizeof(ComplexInput);
context.matrix_len = (db_out->header.n_block * sizeof(flag_gpu_correlator_output_block_t) - sizeof(flag_gpu_output_header_t))/sizeof(Complex);
int xgpu_error = xgpuInit(&context, gpu_dev);
if (XGPU_OK != xgpu_error) {
fprintf(stderr, "ERROR: xGPU initialization failed (error code %d)\n", xgpu_error);
return THREAD_ERROR;
}
// Mark thread as ready to run
hashpipe_status_lock_safe(&st);
hputi4(st.buf, "CORREADY", 1);
hashpipe_status_unlock_safe(&st);
int rv;
int curblock_in = 0;
int curblock_out = 0;
uint64_t start_mcnt = 0;
uint64_t last_mcnt = 0;
int int_count = 1; // Number of blocks to integrate per dump
state cur_state = ACQUIRE;
state next_state = ACQUIRE;
char netstat[17];
int64_t good_data = 1;
while (run_threads()) {
if (cur_state == ACQUIRE) {
next_state = ACQUIRE;
// Wait for input buffer block to be filled
while ((rv=flag_input_databuf_wait_filled(db_in, curblock_in)) != HASHPIPE_OK && run_threads()) {
if (rv==HASHPIPE_TIMEOUT) {
int cleanb;
hashpipe_status_lock_safe(&st);
hgetl(st.buf, "CLEANA", &cleanb);
hgets(st.buf, "NETSTAT", 16, netstat);
hashpipe_status_unlock_safe(&st);
if (cleanb == 0 && strcmp(netstat, "CLEANUP") == 0) {
printf("COR: Cleanup condition met!\n");
next_state = CLEANUP;
break;
}
}
else {
hashpipe_error(__FUNCTION__, "error waiting for filled databuf block");
pthread_exit(NULL);
break;
}
}
if(!run_threads()) break;
if (next_state != CLEANUP) {
// Print out the header information for this block
flag_input_header_t tmp_header;
memcpy(&tmp_header, &db_in->block[curblock_in].header, sizeof(flag_input_header_t));
//printf("COR: Received block %d, starting mcnt = %lld\n", curblock_in, (long long int)tmp_header.mcnt_start);
good_data &= tmp_header.good_data;
hashpipe_status_lock_safe(&st);
hputi4(st.buf, "CORMCNT", tmp_header.mcnt_start);
hashpipe_status_unlock_safe(&st);
// Retrieve correlator integrator status
hashpipe_status_lock_safe(&st);
hgets(st.buf, "INTSTAT", 16, integ_status);
hashpipe_status_unlock_safe(&st);
// If the correlator integrator status is "off,"
// Free the input block and continue
if (strcmp(integ_status, "off") == 0) {
// fprintf(stderr, "COR: Correlator is off...\n");
flag_input_databuf_set_free(db_in, curblock_in);
curblock_in = (curblock_in + 1) % db_in->header.n_block;
good_data = 1;
continue;
}
// If the correlator integrator status is "start,"
// Get the correlator started
// The INTSTAT string is set to "start" by the net thread once it's up and running
if (strcmp(integ_status, "start") == 0) {
// Get the starting mcnt for integration (should be zero)
hashpipe_status_lock_safe(&st);
hgeti4(st.buf, "NETMCNT", (int *)(&start_mcnt));
hashpipe_status_unlock_safe(&st);
// Check to see if block's starting mcnt matches INTSYNC
if (db_in->block[curblock_in].header.mcnt_start < start_mcnt) {
// If we get here, then there is a bug since the net thread shouldn't
// mark blocks as filled that are before the starting mcnt
// fprintf(stderr, "COR: Unable to start yet... waiting for mcnt = %lld\n", (long long int)start_mcnt);
// starting mcnt not yet reached
// free block and continue
flag_input_databuf_set_free(db_in, curblock_in);
curblock_in = (curblock_in + 1) % db_in->header.n_block;
continue;
}
else if (db_in->block[curblock_in].header.mcnt_start == start_mcnt) {