Skip to content

Profile API

model_navigator.profile

profile(func, dataloader, target_formats=None, runners=None, window_size=DEFAULT_WINDOW_SIZE, stability_percentage=DEFAULT_STABILITY_PERCENTAGE, stabilization_windows=DEFAULT_STABILIZATION_WINDOWS, min_trials=DEFAULT_MIN_TRIALS, max_trials=DEFAULT_MAX_TRIALS, throughput_cutoff_threshold=DEFAULT_THROUGHPUT_CUTOFF_THRESHOLD, throughput_backoff_limit=DEFAULT_THROUGHPUT_BACKOFF_LIMIT, device='cuda', initialize=True, verbose=False)

Profile func in scope of which registered modules are executed.

Parameters:

  • func (Callable) –

    Callable to profile.

  • dataloader (Sequence[Tuple[int, Any]]) –

    List of tuples with batch size and input.

  • target_formats (Optional[Tuple[Union[str, Format], ...]], default: None ) –

    Target model formats for optimize process

  • runners (Optional[Tuple[Union[str, Type[NavigatorRunner]], ...]], default: None ) –

    Use only runners provided as parameter

  • min_trials (int, default: DEFAULT_MIN_TRIALS ) –

    Minimum number of trials.

  • max_trials (int, default: DEFAULT_MAX_TRIALS ) –

    Maximum number of trials.

  • stabilization_windows (int, default: DEFAULT_STABILIZATION_WINDOWS ) –

    Number of stabilization windows.

  • window_size (int, default: DEFAULT_WINDOW_SIZE ) –

    Number of inference queries performed in measurement window

  • stability_percentage (float, default: DEFAULT_STABILITY_PERCENTAGE ) –

    Allowed percentage of variation from the mean in three consecutive windows.

  • throughput_cutoff_threshold (Optional[float], default: DEFAULT_THROUGHPUT_CUTOFF_THRESHOLD ) –

    Minimum throughput increase to continue profiling. If None is provided, profiling run through whole dataloader

  • throughput_backoff_limit (int, default: DEFAULT_THROUGHPUT_BACKOFF_LIMIT ) –

    Back-off limit to run multiple more profiling steps to avoid stop at local minimum when throughput saturate based on throughput_cutoff_threshold.

  • device (str, default: 'cuda' ) –

    Default device used for loading unoptimized model.

  • initialize (bool, default: True ) –

    Whether to initialize pipeline on device before profiling.

  • verbose (bool, default: False ) –

    Provide verbose logging

Source code in model_navigator/inplace/__init__.py
def profile(
    func: Callable,
    dataloader: Sequence[Tuple[int, Any]],
    target_formats: Optional[Tuple[Union[str, Format], ...]] = None,
    runners: Optional[Tuple[Union[str, Type[NavigatorRunner]], ...]] = None,
    window_size: int = DEFAULT_WINDOW_SIZE,
    stability_percentage: float = DEFAULT_STABILITY_PERCENTAGE,
    stabilization_windows: int = DEFAULT_STABILIZATION_WINDOWS,
    min_trials: int = DEFAULT_MIN_TRIALS,
    max_trials: int = DEFAULT_MAX_TRIALS,
    throughput_cutoff_threshold: Optional[float] = DEFAULT_THROUGHPUT_CUTOFF_THRESHOLD,
    throughput_backoff_limit: int = DEFAULT_THROUGHPUT_BACKOFF_LIMIT,
    device: str = "cuda",
    initialize: bool = True,
    verbose: bool = False,
) -> InplaceProfileStatus:
    """Profile `func` in scope of which registered modules are executed.

    Args:
        func:  Callable to profile.
        dataloader: List of tuples with batch size and input.
        target_formats: Target model formats for optimize process
        runners: Use only runners provided as parameter
        min_trials: Minimum number of trials.
        max_trials: Maximum number of trials.
        stabilization_windows: Number of stabilization windows.
        window_size: Number of inference queries performed in measurement window
        stability_percentage: Allowed percentage of variation from the mean in three consecutive windows.
        throughput_cutoff_threshold: Minimum throughput increase to continue profiling. If None is provided,
                                     profiling run through whole dataloader
        throughput_backoff_limit: Back-off limit to run multiple more profiling steps to avoid stop at local minimum
                                  when throughput saturate based on `throughput_cutoff_threshold`.
        device: Default device used for loading unoptimized model.
        initialize: Whether to initialize pipeline on device before profiling.
        verbose: Provide verbose logging
    """
    log_file = inplace_config.cache_dir / "profiling.log"
    reconfigure_logging_to_file(log_file)

    if target_formats is None:
        target_formats = DEFAULT_TORCH_TARGET_FORMATS_FOR_PROFILING
    if runners is None:
        runners = list(runner_registry.values())

    event_emitter = profile_event_emitter()
    event_emitter.emit(ProfileEvent.PROFILING_STARTED)

    validate_device_string(device)
    modelkeys_runners = _get_modelkeys_runners(target_formats, runners)

    default_modelkeys_runners = [("python", "eager")]
    optimized_modules_count = len([m.is_optimized for m in module_registry.values()])
    if optimized_modules_count > 1:
        default_modelkeys_runners += [("navigator", "optimized")]

    modelkeys_runners = default_modelkeys_runners + list(modelkeys_runners)
    LOGGER.info(f"Profiling runners: {modelkeys_runners}")

    profiling_results = ProfilingResults()
    for model_key, runner_name in modelkeys_runners:
        runtime_name = f"{model_key} on {runner_name}"
        event_emitter.emit(ProfileEvent.RUNTIME_PROFILING_STARTED, name=runtime_name)
        try:
            _initialize_modules(
                func=func,
                model_key=model_key,
                runner_name=runner_name,
                device=device,
                initialize=initialize,
                verbose=verbose,
            )
            try:
                runner_profiling_results = RunnerProfilingResults(status=CommandStatus.OK)
                for sample_id, result in _profile_runner(
                    runner_name=runner_name,
                    func=func,
                    dataloader=dataloader,
                    min_trials=min_trials,
                    max_trials=max_trials,
                    stabilization_windows=stabilization_windows,
                    window_size=window_size,
                    stability_percentage=stability_percentage,
                    throughput_cutoff_threshold=throughput_cutoff_threshold,
                    throughput_backoff_limit=throughput_backoff_limit,
                ):
                    runner_profiling_results.detailed[sample_id] = result

                results_str = []
                for result in runner_profiling_results.detailed.values():
                    results_str.append(
                        f"""Batch: {result.batch_size:6}, """
                        f"""Throughput: {result.throughput:10.2f} [infer/sec], """
                        f"""Avg Latency: {result.avg_latency:10.2f} [ms]"""
                    )

                results_str = "\n".join(results_str)
                LOGGER.info(f"Collected results: \n{results_str}")
                time.sleep(0.1)  # FIXME: WAR to avoid overlapping messages

                for result in runner_profiling_results.detailed.values():
                    event_emitter.emit(ProfileEvent.RUNTIME_PROFILING_RESULT, result=result)

                event_emitter.emit(ProfileEvent.RUNTIME_PROFILING_FINISHED)
            except Exception as e:
                LOGGER.error(f"Profiling failed for model_key {model_key} and runner {runner_name}.")
                LOGGER.error(str(e))
                if verbose:
                    LOGGER.error(f"Traceback: {traceback.format_exc()}")

                runner_profiling_results = RunnerProfilingResults(status=CommandStatus.FAIL)
                event_emitter.emit(ProfileEvent.RUNTIME_PROFILING_ERROR)
        except Exception as e:
            LOGGER.error(f"Loading model failed for model_key {model_key} and runner {runner_name}.")
            LOGGER.error(str(e))
            if verbose:
                LOGGER.error(f"Traceback: {traceback.format_exc()}")

            runner_profiling_results = RunnerProfilingResults(status=CommandStatus.FAIL)
            event_emitter.emit(ProfileEvent.RUNTIME_PROFILING_ERROR)

        if model_key not in profiling_results.models:
            profiling_results.models[model_key] = RunnerResults()
            profiling_results.models[model_key].runners[runner_name] = RunnerProfilingResults()
        elif runner_name not in profiling_results.models[model_key].runners:
            profiling_results.models[model_key].runners[runner_name] = RunnerProfilingResults()
        profiling_results.models[model_key].runners[runner_name] = runner_profiling_results

    event_emitter.emit(ProfileEvent.PROFILING_FINISHED)

    status = _build_profile_status(profiling_results)

    return status