from torch import Tensor
import tempfile
import torch
import numpy as np
ch = torch
[docs]
def test_install(use_fast_jl: bool = True):
try:
from trak import TRAKer
except ImportError:
raise ImportError(
"TRAK is not installed! Please install it using `pip install traker`"
)
data = (ch.randn(20, 256), ch.randint(high=2, size=(20,)))
model = ch.nn.Linear(256, 2, bias=False)
if use_fast_jl:
with tempfile.TemporaryDirectory() as tmpdirname:
data = [x.cuda() for x in data]
model = model.cuda()
traker = TRAKer(
model=model,
task="image_classification",
proj_dim=512,
save_dir=tmpdirname,
train_set_size=20,
logging_level=100,
)
traker.load_checkpoint(model.state_dict(), model_id=0)
traker.featurize(data, num_samples=20)
print("TRAK and fast_jl are installed correctly!")
else:
from trak.projectors import NoOpProjector
with tempfile.TemporaryDirectory() as tmpdirname:
traker = TRAKer(
model=model,
task="image_classification",
train_set_size=20,
proj_dim=512,
save_dir=tmpdirname,
projector=NoOpProjector(),
device="cpu",
logging_level=100,
)
traker.load_checkpoint(model.state_dict(), model_id=0)
traker.featurize(data, num_samples=20)
print("TRAK is installed correctly!")
[docs]
def parameters_to_vector(parameters) -> Tensor:
"""
Same as https://pytorch.org/docs/stable/generated/torch.nn.utils.parameters_to_vector.html
but with :code:`reshape` instead of :code:`view` to avoid a pesky error.
"""
vec = []
for param in parameters:
vec.append(param.reshape(-1))
return ch.cat(vec)
[docs]
def get_num_params(model: torch.nn.Module) -> int:
return parameters_to_vector(model.parameters()).numel()
[docs]
def is_not_buffer(ind, params_dict) -> bool:
name = params_dict[ind]
if (
("running_mean" in name)
or ("running_var" in name)
or ("num_batches_tracked" in name)
):
return False
return True
[docs]
def vectorize(g, arr=None, device="cuda") -> Tensor:
"""
records result into arr
gradients are given as a dict :code:`(name_w0: grad_w0, ... name_wp:
grad_wp)` where :code:`p` is the number of weight matrices. each
:code:`grad_wi` has shape :code:`[batch_size, ...]` this function flattens
:code:`g` to have shape :code:`[batch_size, num_params]`.
"""
if arr is None:
g_elt = g[list(g.keys())[0]]
batch_size = g_elt.shape[0]
num_params = 0
for param in g.values():
assert param.shape[0] == batch_size
num_params += int(param.numel() / batch_size)
arr = ch.empty(size=(batch_size, num_params), dtype=g_elt.dtype, device=device)
pointer = 0
for param in g.values():
if len(param.shape) < 2:
num_param = 1
p = param.data.reshape(-1, 1)
else:
num_param = param[0].numel()
p = param.flatten(start_dim=1).data
arr[:, pointer : pointer + num_param] = p.to(device)
pointer += num_param
return arr
[docs]
def get_output_memory(features: Tensor, target_grads: Tensor, target_dtype: type):
output_shape = features.size(0) * target_grads.size(0)
output_dtype_size = ch.empty((1,), dtype=target_dtype).element_size()
return output_shape * output_dtype_size
[docs]
def get_free_memory(device):
reserved = ch.cuda.memory_reserved(device=device)
allocated = ch.cuda.memory_allocated(device=device)
free = reserved - allocated
return free
[docs]
def get_matrix_mult_standard(
features: Tensor, target_grads: Tensor, target_dtype: type
):
output = features @ target_grads.t()
return output.clone().to(target_dtype)
[docs]
def get_matrix_mult_blockwise(
features: Tensor, target_grads: Tensor, target_dtype: type, bs: int
):
s_features = features.shape[0]
s_target_grads = target_grads.shape[0]
bs = min(s_features, s_target_grads, bs)
# Copy the data in a pinned memory location to allow non-blocking
# copies to the GPU
features = features.pin_memory()
target_grads = target_grads.pin_memory()
# precompute all the blocks we will have to compute
slices = []
for i in range(int(np.ceil(s_features / bs))):
for j in range(int(np.ceil(s_target_grads / bs))):
slices.append((slice(i * bs, (i + 1) * bs), slice(j * bs, (j + 1) * bs)))
# Allocate memory for the final output.
final_output = ch.empty(
(s_features, s_target_grads), dtype=target_dtype, device="cpu"
)
# Output buffers pinned on the CPU to be able to collect data from the
# GPU asynchronously
# For each of our (2) cuda streams we need two output buffer, one
# is currently written on with the next batch of result and the
# second one is already finished and getting copied on the final output
# If the size is not a multiple of batch size we need extra buffers
# with the proper shapes
outputs = [
ch.zeros((bs, bs), dtype=target_dtype, device=features.device).pin_memory()
for x in range(4)
]
left_bottom = s_features % bs
options = [outputs] # List of buffers we can potentially use
if left_bottom:
outputs_target_gradsottom = [
ch.zeros(
(left_bottom, bs), dtype=target_dtype, device=features.device
).pin_memory()
for x in range(4)
]
options.append(outputs_target_gradsottom)
left_right = s_target_grads % bs
if left_right:
outputs_right = [
ch.zeros(
(bs, left_right), dtype=target_dtype, device=features.device
).pin_memory()
for x in range(4)
]
options.append(outputs_right)
if left_right and left_bottom:
outputs_corner = [
ch.zeros(
(left_bottom, left_right), dtype=target_dtype, device=features.device
).pin_memory()
for x in range(4)
]
options.append(outputs_corner)
streams = [ch.cuda.Stream() for x in range(2)]
# The slice that was computed last and need to now copied onto the
# final output
previous_slice = None
def find_buffer_for_shape(shape):
for buff in options:
if buff[0].shape == shape:
return buff
return None
for i, (slice_i, slice_j) in enumerate(slices):
with ch.cuda.stream(streams[i % len(streams)]):
# Copy the relevant blocks from CPU to the GPU asynchronously
features_i = features[slice_i, :].cuda(non_blocking=True)
target_grads_j = target_grads[slice_j, :].cuda(non_blocking=True)
output_slice = features_i @ target_grads_j.t()
find_buffer_for_shape(output_slice.shape)[i % 4].copy_(
output_slice, non_blocking=False
)
# Write the previous batch of data from the temporary buffer
# onto the final one (note that this was done by the other stream
# so we swap back to the other one
with ch.cuda.stream(streams[(i + 1) % len(streams)]):
if previous_slice is not None:
output_slice = final_output[previous_slice[0], previous_slice[1]]
output_slice.copy_(
find_buffer_for_shape(output_slice.shape)[(i - 1) % 4],
non_blocking=True,
)
previous_slice = (slice_i, slice_j)
# Wait for all the calculations/copies to be done
ch.cuda.synchronize()
# Copy the last chunk to the final result (from the appropriate buffer)
output_slice = final_output[previous_slice[0], previous_slice[1]]
output_slice.copy_(
find_buffer_for_shape(output_slice.shape)[i % 4], non_blocking=True
)
return final_output
[docs]
def get_matrix_mult(
features: Tensor,
target_grads: Tensor,
target_dtype: torch.dtype = None,
batch_size: int = 8096,
use_blockwise: bool = False,
) -> Tensor:
"""
Computes features @ target_grads.T. If the output matrix is too large to fit
in memory, it will be computed in blocks.
Args:
features (Tensor):
The first matrix to multiply.
target_grads (Tensor):
The second matrix to multiply.
target_dtype (torch.dtype, optional):
The dtype of the output matrix. If None, defaults to the dtype of
features. Defaults to None.
batch_size (int, optional):
The batch size to use for blockwise matrix multiplication. Defaults
to 8096.
use_blockwise (bool, optional):
Whether or not to use blockwise matrix multiplication. Defaults to
False.
"""
if target_dtype is None:
target_dtype = features.dtype
if use_blockwise:
return get_matrix_mult_blockwise(
features.cpu(), target_grads.cpu(), target_dtype, batch_size
)
elif features.device.type == "cpu":
return get_matrix_mult_standard(features, target_grads, target_dtype)
output_memory = get_output_memory(features, target_grads, target_dtype)
free_memory = get_free_memory(features.device)
if output_memory < free_memory:
return get_matrix_mult_standard(features, target_grads, target_dtype)
else:
return get_matrix_mult_blockwise(
features.cpu(), target_grads.cpu(), target_dtype, batch_size
)
[docs]
def get_parameter_chunk_sizes(
model: torch.nn.Module,
batch_size: int,
):
"""The :class:`CudaProjector` supports projecting when the product of the
number of parameters and the batch size is less than the the max value of
int32. This function computes the number of parameters that can be projected
at once for a given model and batch size.
The method returns a tuple containing the maximum number of parameters that
can be projected at once and a list of the actual number of parameters in
each chunk (a sequence of paramter groups). Used in
:class:`ChunkedCudaProjector`.
"""
param_shapes = []
for p in model.parameters():
param_shapes.append(p.numel())
param_shapes = np.array(param_shapes)
chunk_sum = 0
max_chunk_size = np.iinfo(np.uint32).max // batch_size
params_per_chunk = []
for ps in param_shapes:
if chunk_sum + ps >= max_chunk_size:
params_per_chunk.append(chunk_sum)
chunk_sum = 0
chunk_sum += ps
if param_shapes.sum() - np.sum(params_per_chunk) > 0:
params_per_chunk.append(param_shapes.sum() - np.sum(params_per_chunk))
return max_chunk_size, params_per_chunk