Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/SystemDS-config.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
<!-- #blocks the input frame is split up for multithreaded tokenization -->
<sysds.parallel.tokenize.numBlocks>64</sysds.parallel.tokenize.numBlocks>

<!-- if true, frame-to-matrix conversion writes NaN for cells that cannot be cast to double and warns once,
instead of failing the conversion (default false: strict, fail-fast) -->
<sysds.frame.tomatrix.warncast>false</sysds.frame.tomatrix.warncast>

<!-- enables compressed linear algebra, experimental feature -->
<sysds.compressed.linalg>false</sysds.compressed.linalg>

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/apache/sysds/conf/DMLConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class DMLConfig
public static final String PARALLEL_ENCODE_NUM_THREADS = "sysds.parallel.encode.numThreads";
public static final String PARALLEL_TOKENIZE = "sysds.parallel.tokenize";
public static final String PARALLEL_TOKENIZE_NUM_BLOCKS = "sysds.parallel.tokenize.numBlocks";
public static final String FRAME_TO_MATRIX_WARN_CAST = "sysds.frame.tomatrix.warncast";
public static final String COMPRESSED_LINALG = "sysds.compressed.linalg";
public static final String COMPRESSED_LINALG_INTERMEDIATE = "sysds.compressed.linalg.intermediate";
public static final String COMPRESSED_LOSSY = "sysds.compressed.lossy";
Expand Down Expand Up @@ -159,6 +160,7 @@ public class DMLConfig
_defaultVals.put(IO_COMPRESSION_CODEC, "none");
_defaultVals.put(PARALLEL_TOKENIZE, "false");
_defaultVals.put(PARALLEL_TOKENIZE_NUM_BLOCKS, "64");
_defaultVals.put(FRAME_TO_MATRIX_WARN_CAST, "false");
_defaultVals.put(PARALLEL_ENCODE, "true" );
_defaultVals.put(PARALLEL_ENCODE_STAGED, "false" );
_defaultVals.put(PARALLEL_ENCODE_APPLY_BLOCKS, "-1");
Expand Down Expand Up @@ -456,7 +458,7 @@ public static DMLConfig readConfigurationFile(String configPath)
public String getConfigInfo() {
String[] tmpConfig = new String[] {
LOCAL_TMP_DIR,SCRATCH_SPACE,OPTIMIZATION_LEVEL, DEFAULT_BLOCK_SIZE,
CP_PARALLEL_OPS, CP_PARALLEL_IO, PARALLEL_ENCODE, NATIVE_BLAS, NATIVE_BLAS_DIR,
CP_PARALLEL_OPS, CP_PARALLEL_IO, PARALLEL_ENCODE, FRAME_TO_MATRIX_WARN_CAST, NATIVE_BLAS, NATIVE_BLAS_DIR,
COMPRESSED_LINALG, COMPRESSED_LOSSY, COMPRESSED_VALID_COMPRESSIONS, COMPRESSED_OVERLAPPING,
COMPRESSED_SAMPLING_RATIO, COMPRESSED_SOFT_REFERENCE_COUNT,
COMPRESSED_COCODE, COMPRESSED_TRANSPOSE, COMPRESSED_TRANSFORMENCODE, DAG_LINEARIZATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public static double parseDouble(String value) {
return Double.POSITIVE_INFINITY;
else if(len == 4 && value.compareToIgnoreCase("-Inf") == 0)
return Double.NEGATIVE_INFINITY;
throw new DMLRuntimeException(e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,26 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;

public interface MatrixBlockFromFrame {
public class MatrixBlockFromFrame {
public static final Log LOG = LogFactory.getLog(MatrixBlockFromFrame.class.getName());

public static final int blocksizeIJ = 32;

public static Boolean WARNED_FOR_FAILED_CAST = false;

private MatrixBlockFromFrame(){
// private constructor for code coverage.
}

/**
* Converts a frame block with arbitrary schema into a matrix block. Since matrix block only supports value type
* double, we do a best effort conversion of non-double types which might result in errors for non-numerical data.
Expand Down Expand Up @@ -68,11 +76,15 @@ public static MatrixBlock convertToMatrixBlock(FrameBlock frame, MatrixBlock ret
if(k == -1)
k = InfrastructureAnalyzer.getLocalParallelism();

// Read once on the calling thread: the thread-local config is not visible to pool workers.
final boolean warnCast = ConfigurationManager.getDMLConfig()
.getBooleanValue(DMLConfig.FRAME_TO_MATRIX_WARN_CAST);

long nnz = 0;
if(k == 1)
nnz = convert(frame, ret, n, 0, m);
nnz = convert(frame, ret, n, 0, m, warnCast);
else
nnz = convertParallel(frame, ret, m, n, k);
nnz = convertParallel(frame, ret, m, n, k, warnCast);

ret.setNonZeros(nnz);
ret.examSparsity();
Expand All @@ -93,14 +105,37 @@ else if(ret.getNumRows() != m || ret.getNumColumns() != n || ret.isInSparseForma
return ret;
}

private static long convert(FrameBlock frame, MatrixBlock mb, int n, int rl, int ru) {
private static long convert(FrameBlock frame, MatrixBlock mb, int n, int rl, int ru, boolean warnCast) {
// Strict (default): let number format errors propagate and fail the conversion.
if(!warnCast)
return convertStrict(frame, mb, n, rl, ru);

// Warn-only: on number format errors fall back to writing NaN for the incompatible cells.
try {
return convertStrict(frame, mb, n, rl, ru);
}
catch(NumberFormatException | DMLRuntimeException e) {
synchronized(WARNED_FOR_FAILED_CAST){
if(!WARNED_FOR_FAILED_CAST) {
LOG.error(
"Failed to convert to Matrix because of number format errors, falling back to NaN on incompatible cells",
e);
WARNED_FOR_FAILED_CAST = true;
}
}
return convertSafeCast(frame, mb, n, rl, ru);
}
}

private static long convertStrict(FrameBlock frame, MatrixBlock mb, int n, int rl, int ru) {
if(mb.getDenseBlock().isContiguous())
return convertContiguous(frame, mb, n, rl, ru);
else
return convertGeneric(frame, mb, n, rl, ru);
}

private static long convertParallel(FrameBlock frame, MatrixBlock mb, int m, int n, int k) throws Exception {
private static long convertParallel(FrameBlock frame, MatrixBlock mb, int m, int n, int k, boolean warnCast)
throws Exception {
ExecutorService pool = CommonThreadPool.get(k);
try {
List<Future<Long>> tasks = new ArrayList<>();
Expand All @@ -109,7 +144,7 @@ private static long convertParallel(FrameBlock frame, MatrixBlock mb, int m, int
for(int i = 0; i < m; i += blkz) {
final int start = i;
final int end = Math.min(i + blkz, m);
tasks.add(pool.submit(() -> convert(frame, mb, n, start, end)));
tasks.add(pool.submit(() -> convert(frame, mb, n, start, end, warnCast)));
}

long nnz = 0;
Expand Down Expand Up @@ -169,4 +204,37 @@ private static long convertBlockGeneric(final FrameBlock frame, long lnnz, final
}
return lnnz;
}

private static long convertSafeCast(final FrameBlock frame, final MatrixBlock mb, final int n, final int rl,
final int ru) {
final DenseBlock c = mb.getDenseBlock();
long lnnz = 0;
for(int bi = rl; bi < ru; bi += blocksizeIJ) {
for(int bj = 0; bj < n; bj += blocksizeIJ) {
int bimin = Math.min(bi + blocksizeIJ, ru);
int bjmin = Math.min(bj + blocksizeIJ, n);
lnnz = convertBlockSafeCast(frame, lnnz, c, bi, bj, bimin, bjmin);
}
}
return lnnz;
}

private static long convertBlockSafeCast(final FrameBlock frame, long lnnz, final DenseBlock c, final int rl,
final int cl, final int ru, final int cu) {
for(int i = rl; i < ru; i++) {
final double[] cvals = c.values(i);
final int cpos = c.pos(i);
for(int j = cl; j < cu; j++) {
try {
lnnz += (cvals[cpos + j] = frame.getDoubleNaN(i, j)) != 0 ? 1 : 0;
}
catch(NumberFormatException | DMLRuntimeException e) {
lnnz += 1;
cvals[cpos + j] = Double.NaN;
}
}
}
return lnnz;
}

}
9 changes: 5 additions & 4 deletions src/main/java/org/apache/sysds/utils/DoubleParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public interface DoubleParser {
0x8e679c2f5e44ff8fL};

public static double parseFloatingPointLiteral(String str, int offset, int endIndex) {
if(endIndex > 100)
if(endIndex > 100)// long string
return Double.parseDouble(str);
// Skip leading whitespace
int index = skipWhitespace(str, offset, endIndex);
Expand All @@ -197,9 +197,10 @@ public static double parseFloatingPointLiteral(String str, int offset, int endIn
}

// Parse NaN or Infinity (this occurs rarely)
if(ch >= 'I')
return Double.parseDouble(str);
else if(str.charAt(endIndex - 1) >= 'a')
// : is the first character after numbers.
// 0 is the first number.
// we use the last position, since this is not allowed to be other values than a number.
if(str.charAt(endIndex - 1) > '9' || str.charAt(endIndex - 1) < '0')
return Double.parseDouble(str);

final double val = parseDecFloatLiteral(str, index, offset, endIndex);
Expand Down
Loading
Loading