Coverage for src/amisc/transform.py: 83%

98 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-05 19:26 +0000

1"""Module for data transformation methods. 

2 

3Includes: 

4 

5- `Transform` — an abstract interface for specifying a transformation. 

6- `Linear` — a linear transformation $y=mx+b$. 

7- `Log` — a logarithmic transformation $y=\\log_b(x + \\mathrm{offset})$. 

8- `Minmax` — a min-max scaling transformation $x: (lb, ub) \\mapsto (lb_{norm}, ub_{norm})$. 

9- `Zscore` — a z-score normalization transformation $y=(x-\\mu)/\\sigma$. 

10 

11Transform objects can be converted easily to/from strings for serialization. 

12""" 

13from __future__ import annotations 

14 

15from abc import ABC, abstractmethod 

16 

17import numpy as np 

18from numpy.typing import ArrayLike 

19 

20from amisc.utils import parse_function_string 

21 

22__all__ = ['Transform', 'Linear', 'Log', 'Minmax', 'Zscore'] 

23 

24 

25class Transform(ABC): 

26 """A base class for all transformations. 

27 

28 :ivar transform_args: the arguments for the transformation 

29 :vartype transform_args: tuple 

30 """ 

31 

32 def __init__(self, transform_args: tuple): 

33 self.transform_args = transform_args 

34 

35 def __str__(self): 

36 """Serialize a `Transform` object to string.""" 

37 return f'{type(self).__name__}{self.transform_args}' 

38 

39 @classmethod 

40 def from_string(cls, transform_spec: str | list[str]) -> list[Transform] | None: 

41 """Return a list of `Transforms` given a list of string specifications. Available transformations are: 

42 

43 - **linear** — $x_{norm} = mx + b$ specified as `linear(m, b)` or `linear(slope=m, offset=b)`. `m=1, b=0` if not 

44 specified. 

45 - **log** — $x_{norm} = \\log_b(x)$ specified as `log` or `log10` for the natural or common logarithms. For a 

46 different base, use `log(b)`. Optionally, specify `offset` for `log(x+offset)`. 

47 - **minmax** — $x_{norm} = \\frac{x - a}{b - a}(u - l) + l$ specified as `minmax(a, b, l, u)` or 

48 `minmax(lb=a, ub=b, lb_norm=l, ub_norm=u)`. Scales `x` from the range `(a, b)` to `(l, u)`. By 

49 default, `(a, b)` is the Variable's domain and `(l, u)` is `(0, 1)`. Use simply as `minmax` 

50 to use all defaults. 

51 - **zscore** — $x_{norm} = \\frac{x - m}{s}$ specified as `zscore(m, s)` or `zscore(mu=m, std=s)`. If the 

52 Variable is specified as `distribution=normal`, then `zscore` defaults to the Variable's 

53 own `mu, std`. 

54 

55 !!! Example 

56 ```python 

57 transforms = Transform.from_string(['log10', 'linear(2, 4)']) 

58 print(transforms) 

59 ``` 

60 will give 

61 ```shell 

62 [ Log(10), Linear(2, 4) ] # the corresponding `Transform` objects 

63 ``` 

64 

65 !!! Warning 

66 You may optionally leave the `minmax` arguments blank to defer to the bounds of the parent `Variable`. 

67 You may also optionally leave the `zscore` arguments blank to defer to the `(mu, std)` of the parent 

68 `Variable`, but this will throw a runtime error if `Variable.distribution` is not `Normal(mu, std)`. 

69 """ 

70 if transform_spec is None: 

71 return None 

72 if isinstance(transform_spec, str | Transform): 

73 transform_spec = [transform_spec] 

74 

75 transforms = [] 

76 for spec_string in transform_spec: 

77 if isinstance(spec_string, Transform): 

78 transforms.append(spec_string) 

79 continue 

80 

81 name, args, kwargs = parse_function_string(spec_string) 

82 if name.lower() == 'linear': 

83 try: 

84 slope = float(kwargs.get('slope', args[0] if len(args) > 0 else 1)) 

85 offset = float(kwargs.get('offset', args[1] if len(args) > 1 else 0)) 

86 transforms.append(Linear((slope, offset))) 

87 except Exception as e: 

88 raise ValueError(f'Linear transform spec "{spec_string}" is not valid: Try "linear(m, b)".') from e 

89 elif name.lower() in ['log', 'log10']: 

90 try: 

91 log_base = float(kwargs.get('base', args[0] if len(args) > 0 else (np.e if name.lower() == 'log' 

92 else 10))) 

93 offset = float(kwargs.get('offset', args[1] if len(args) > 1 else 0)) 

94 transforms.append(Log((log_base, offset))) 

95 except Exception as e: 

96 raise ValueError(f'Log transform spec "{spec_string}" is not valid: Try "log(base, offset)"') from e 

97 elif name.lower() in ['minmax', 'maxabs']: 

98 try: 

99 # Defer bounds to the Variable by setting np.nan 

100 lb = float(kwargs.get('lb', args[0] if len(args) > 0 else np.nan)) 

101 ub = float(kwargs.get('ub', args[1] if len(args) > 1 else np.nan)) 

102 lb_norm = float(kwargs.get('lb_norm', args[2] if len(args) > 2 else 0)) 

103 ub_norm = float(kwargs.get('ub_norm', args[3] if len(args) > 3 else 1)) 

104 transforms.append(Minmax((lb, ub, lb_norm, ub_norm))) 

105 except Exception as e: 

106 raise ValueError(f'Minmax transform spec "{spec_string}" is not valid: Try "minmax(lb, ub)"') from e 

107 elif name.lower() in ['z', 'zscore']: 

108 try: 

109 # Defer (mu, std) to the Variable by setting np.nan 

110 mu = float(kwargs.get('mu', args[0] if len(args) > 0 else np.nan)) 

111 std = float(kwargs.get('std', args[1] if len(args) > 1 else np.nan)) 

112 transforms.append(Zscore((mu, std))) 

113 except Exception as e: 

114 raise ValueError(f'Z-score normalization string "{spec_string}" is not valid: ' 

115 f'Try "zscore(mu, std)".') from e 

116 else: 

117 raise NotImplementedError(f'Transform method "{name}" is not implemented.') 

118 

119 return transforms 

120 

121 def transform(self, x: ArrayLike, inverse: bool = False, transform_args: tuple = None) -> ArrayLike: 

122 """Transform the given values `x`. This wrapper function handles the input type and tries to 

123 return the transformed values in the same type. 

124 

125 :param x: the values to transform 

126 :param inverse: whether to do the inverse transform instead 

127 :param transform_args: overrides `Transform.transform_args` 

128 :return: the transformed values 

129 """ 

130 input_type = type(x) 

131 result = self._transform(np.atleast_1d(x), inverse, transform_args) 

132 if input_type in [int, float]: 

133 return float(result[0]) 

134 elif input_type is list: 

135 return result.tolist() 

136 elif input_type is tuple: 

137 return tuple(result.tolist()) 

138 else: 

139 return result # just keep as np.ndarray for everything else 

140 

141 @abstractmethod 

142 def _transform(self, x, inverse=False, transform_args=None): 

143 """Abstract method that subclass `Transform` objects should implement. 

144 

145 :param x: the values to transform 

146 :param inverse: whether to do the inverse transform instead 

147 :param transform_args: overrides `Transform.transform_args` 

148 :return: the transformed values 

149 """ 

150 raise NotImplementedError 

151 

152 

153class Linear(Transform): 

154 """A Linear transform: $y=mx+b$. 

155 

156 :ivar transform_args: `(m, b)` the slope and offset 

157 """ 

158 def _transform(self, x, inverse=False, transform_args=None): 

159 slope, offset = transform_args or self.transform_args 

160 return (x - offset) / slope if inverse else slope * x + offset 

161 

162 

163class Log(Transform): 

164 """A Log transform: $y=\\log_b(x + \\mathrm{offset})$. 

165 

166 :ivar transform_args: `(base, offset)` the log base and offset 

167 """ 

168 def _transform(self, x, inverse=False, transform_args=None): 

169 log_base, offset = transform_args or self.transform_args 

170 return np.exp(x * np.log(log_base)) - offset if inverse else np.log(x + offset) / np.log(log_base) 

171 

172 

173class Minmax(Transform): 

174 """A Minmax transform: $x: (lb, ub) \\mapsto (lb_{norm}, ub_{norm})$. 

175 

176 :ivar transform_args: `(lb, ub, lb_norm, ub_norm)` the original lower and upper bounds and the normalized bounds 

177 """ 

178 def _transform(self, x, inverse=False, transform_args=None): 

179 transform_args = transform_args or self.transform_args 

180 if np.any(np.isnan(transform_args)): 

181 raise RuntimeError(f'Transform args may have missing values: {transform_args}') 

182 lb, ub, lb_norm, ub_norm = transform_args 

183 if inverse: 

184 return (x - lb_norm) / (ub_norm - lb_norm) * (ub - lb) + lb 

185 else: 

186 return (x - lb) / (ub - lb) * (ub_norm - lb_norm) + lb_norm 

187 

188 def update(self, lb=None, ub=None, lb_norm=None, ub_norm=None): 

189 """Update the parameters of this transform. 

190 

191 :param lb: the lower bound in the original variable space 

192 :param ub: the upper bound in the original variable space 

193 :param lb_norm: the lower bound of the transformed space 

194 :param ub_norm: the upper bound of the transformed space 

195 """ 

196 transform_args = (lb, ub, lb_norm, ub_norm) 

197 self.transform_args = tuple([ele if ele is not None else self.transform_args[i] 

198 for i, ele in enumerate(transform_args)]) 

199 

200 

201class Zscore(Transform): 

202 """A Zscore transform: $y=(x-\\mu)/\\sigma$. 

203 

204 :ivar transform_args: `(mu, std)` the mean and standard deviation 

205 """ 

206 def _transform(self, x, inverse=False, transform_args=None): 

207 transform_args = transform_args or self.transform_args 

208 if np.any(np.isnan(transform_args)): 

209 raise RuntimeError(f'Transform args may have missing values: {transform_args}') 

210 mu, std = transform_args 

211 return x * std + mu if inverse else (x - mu) / std 

212 

213 def update(self, mu=None, std=None): 

214 """Update the parameters of this transform. 

215 

216 :param mu: the mean of the transform 

217 :param std: the standard deviation of the transform 

218 """ 

219 transform_args = (mu, std) 

220 self.transform_args = tuple([ele if ele is not None else self.transform_args[i] 

221 for i, ele in enumerate(transform_args)])