Anchor | ||||
---|---|---|---|---|
|
ステップの処理ロジックを機能させるためには、数多くのメソッドを実装しなくてはいけません。一部のメソッドは、ロウ(行)ステップか、またはキャッシュステップかに応じて異なります。
...
メソッド | 説明と例 | |||||||||||||||||||||||||||||||||||||||||
public ETLStepAPIVersion getAPIVersion() | ステップの更新を維持するYellowfinステップAPIバージョンの定義に使用するメソッドです。これは通常、列挙型ETLStepAPIVersion内の最新のバージョンです。APIバージョンは、互換性を判断するために使用されます。
| |||||||||||||||||||||||||||||||||||||||||
public Collection<ETLException> validate() | 事前実行検証を行うメソッドです。実装は、必須オプションが設定されているか、資格情報は正確か、ホストに到達可能かを確認しなくてはいけません。エラーは、ETLExceptionのインスタンスに取得され、メソッドから返されるコレクションに追加されます。または、ETLExceptionを構築する代わりに、便宜メソッドであるgetInvalidConfigETLException()を使用することもできます。
| |||||||||||||||||||||||||||||||||||||||||
public Map<String, String> getValidatedStepOptions() | ステップオプションを有効にするメソッドです。optionKeyからoptionValueへのマッピングは、Yellowfinリポジトリデータベースに保存されます。こちらで、無効なオプション値を削除することができます。this.getStepOptions()により返されるマッピングの操作には影響しません。
| |||||||||||||||||||||||||||||||||||||||||
public void setupGeneratedFields() | ステップが新規フィールドの出力を必要とする場合は、こちらのメソッドを実装します。新規フィールド内のデータは、他のフィールドを使用して生成されます。新規フィールドは既存フィールドを置き換えるか、複製する場合もあります。Yellowfinは、各操作に便宜メソッドを提供します。こちらのメソッドは、ETLStepMetadataFieldBeanの新規インスタンスを作成するか、既存フィールドを複製することが予想されます。フィールドが事前に設定されていないか、再度設定する必要がある場合にのみ、こちらのメソッドを実行することが重要です。オプション内の変更が原因でフィールドが再作成された場合は、古いフィールドを削除しなくてはいけません。削除をしない場合、ステップが再設定される度に新規フィールドが生成されることになります。
| |||||||||||||||||||||||||||||||||||||||||
public Integer getMinInputSteps() public Integer getMaxInputSteps() public Integer getMinOutputSteps() public Integer getMaxOutputSteps() | ステップに複数のインプットやアウトプットがある場合、これらのメソッドを上書きしなくてはいけません。Yellowfinは、ステップカテゴリーに基づき最小値/最大値を返すデフォルト実装を提供します。これらの値は、これらのカテゴリーのETLStepCategory列挙型要素内で定義されます。 | |||||||||||||||||||||||||||||||||||||||||
YFLogger | これはメソッドではありませんが、すべてのステップに共通します。ステップは、YFLoggerを使用して、データトランスフォーメーションログに書き込むことができます。これは、インスタンス変数として宣言する必要があります。YFLoggerは、log4jのロガーclassのラッパです。
|
...
ロウ(行)ステップ実装
ロウ(行)ステップは、AbstractETLRowStep classを拡張します。これには、ひとつのメソッド(processWireData())の実装のみが必要です。
processWireData()
フレームワークが、processWireData()を呼び出すと、現在のロウ(行)の各カラム(列)からのデータは、すでに適切なWire上にあります。各wireはメタデータフィールドにマッピングされ、this.getWireForField(fieldUUID)を使用してアクセスされます。データはwireから取得され、処理され、同一のwire、または別のwireに戻されます。
- メソッドパラメーター:メソッドはひとつのパラメーター(List<ETLStepMetadataFieldBean>)を持ちます。これらは、インプットステップからのフィールドです。これらは、便宜のために提供されます。これらはwireを取得するために使用されるかもしれませんが、以下に示すように、デフォルトメタデータフィールドの使用をお勧めします。
- 戻り値:戻り値はbooleanであり、データのロウ(行)を次のステップに出力すべきかどうかを示します。例えばフィルターステップの場合、ロウ(行)内のデータはフィルター条件を満たさないため、falseが返されます。
- 例外:ETLExceptionとInterruptedExceptionをthrowするメソッドです。処理エラーがある場合は、ETLExceptionのインスタンスにラップし、throwすることで、Yellowfinはそれをユーザーに表示することができます。便宜メソッド(this.throwUnhandledETLException(e))を使用することもできます。例外はcatchや、swallowをしてはいけません。interruptedExceptionもcatchされるため、java.lang.Exceptionのcatchはお勧めできません。これが避けられない場合は、InterruptedExceptionをcatchし、異なるcatchブロックにthrowしなくてはいけません。
コードの例
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
} catch (InterruptedException e) { throw e; } catch (Exception e) { log.error("Error: " + e, e); throwUnhandledETLException(e); } |
...
こちらは、特定のフィールドに数値を追加する実装の例です。
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@Override protected boolean processWireData(List<ETLStepMetadataFieldBean> fields) throws ETLException, InterruptedException { // The options should've been validated by the validate() method, // so no need for further checks here String appendFieldUUID = this.getStepOption("APPEND_FIELD"); String newFieldUUID = this.getStepOption("NEW_FIELD"); String appendValue = this.getStepOption("APPEND_VALUE"); Wire<Object, String> appendFieldWire = this.getWireForField(appendFieldUUID); Wire<Object, String> newFieldWire = this.getWireForField(newFieldUUID); Object data = appendFieldWire.getValue(); String newFieldData = null; if (data == null) { newFieldData = appendValue; } else { newFieldData = data.toString() + appendValue; } newFieldWire.send(newFieldData); return true; } |
こちらの例では、すべてのロウ(行)のデータでprocessWireData()が実行されます。実際には、後続のメソッドの呼び出しで変更されないオブジェクトは、メンバー変数にキャッシュしなくてはいけません。例えば、appendFieldUUID、newFieldUUID、appendValue、appnedFieldWire、newFieldWireはメンバー変数にすべきであり、processWoreData()が初めて実行された時にだけ設定されます。
キャッシュステップ実装
キャッシュステップは、AbstractETLCachedStepを拡張します。ひとつのメソッド(processEndRows())のみ実装しなくてはいけません。キャッシュステップは通常、ひとつ以上のインプットステップを持ちます。データ取得ステップはインプットを持たないため、キャッシュステップとして実装されることが多いです。例えばこれは、SQLクエリーの実行によりデータを生成します。
processEndRow()
- インプットステップ:フレームワークは、プロセスが実行が開始されるとすぐに、インプットステップにprocessEndRows()を呼び出します。インプットステップは、データキャッシングを気にする必要がありません。しかし、実装はwireにデータを配置し、その出力へ送らなくてはいけません。こちらは、データ生成ステップのメソッドの実装例です。
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@Override protected void processEndRows() throws ETLException, InterruptedException { // Get the first output flow; // Useful for most steps which have a single output String outFlow = getFirstOutputFlow(); // The step outputs four Generated fields. List<String> orderedFieldUUIDs = new ArrayList<>(4); // The step's implementation of setupGeneratedFields() // should set these up. Their UUIDs would've been saved as step options. orderedFieldUUIDs.add(getStepOption("FIELD1_UUID")); orderedFieldUUIDs.add(getStepOption("FIELD2_UUID")); orderedFieldUUIDs.add(getStepOption("FIELD3_UUID")); orderedFieldUUIDs.add(getStepOption("FIELD4_UUID")); // Sample Data String[] field1_data = {"Adventure", "Relaxation", "Culture", "Family"}; int[] field2_data = {30, 32, 11, 44}; Date[] field3_data = {new Date(103882823L), new Date(10388283323L), new Date(103883232823L), new Date(102323882823L)}; Timestamp[] field4_data = {new Timestamp(103882823L), new Timestamp(10388283323L), new Timestamp(103883232823L), new Timestamp(102323882823L)}; // Generate as many rows as configured in Step Option ROW_COUNT int rowCount = 10;//Integer.parseInt(getStepOption("ROW_COUNT")); Random random = new Random(); for (int i = 0 ; i < rowCount ; i++) { // Data is emitted in packets. // This implementation creates a new packet for every row. // Data packets can accumulate rows and emit, say, every 20 rows. ETLStepResult dataPacket = getFreshDataPacket(outFlow); Object[] row = new Object[4]; row[0] = field1_data[random.nextInt(4)]; row[1] = field2_data[random.nextInt(4)]; row[2] = field3_data[random.nextInt(4)]; row[3] = field4_data[random.nextInt(4)]; // Send the row of data from Default Fields to Output Fields beginInternalTransmission(row, orderedFieldUUIDs); // Accumulate transmitted data in a data packet endInternalTransmission(dataPacket); // Emit the packet of data to the next step. // This may be done less frequently, after accumulating rows emitData(dataPacket); } } |
- トランスフォーメーションステップ:フレームワークは、すべてのインプットがキャッシュステップへのデータ送信を終了した時に、キャッシュトランスフォーメーションステップにprocessEndRows()を呼び出します。各インプットステップのデータは、異なるメモリーキャッシュに保存されます。ステップ実装は必ずwireにデータを送信し、ステップからのデータをそのアウトプットへ送らなくてはいけません。以下の例は、Union-Allステップのprocを実装し、データキャッシュがどのように使用されるのかを示しています。
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@Override protected void processEndRows() throws ETLException, InterruptedException { // Get input flows which feed data to this step Set<String> inputFlowUuids = this.getInputFlowUuids(); // Get the output flow as there can be only one String outFlowUuid = getFirstOutputFlow(); // Get a data packet ETLStepResult dataPacket = getFreshDataPacket(outFlowUuid); for (String inputFlowUuid : inputFlowUuids) { // Get data of each input from its cache ETLDataCache inputData = getDataCache(inputFlowUuid); // Use this to get the Default Metadata Field corresponding to an Input Field. // Input Metadata Field is the field in the input step. // Cached data will be in the order of input fields. Map<String, String> inputToDefaultFieldMap = getInputToDefaultFieldMap(); // The data will match the Input Metadata Fields List<ETLStepMetadataFieldBean> inputFieldList = inputData.getMetadataFields(); List<String> unionResultFields = new ArrayList<String>(); for(ETLStepMetadataFieldBean fieldBean : inputFieldList){ // Get the Default Metadata Field for an Input Metadata Field String inputFieldUuid = fieldBean.getEtlStepMetadataFieldUUID(); String defaultFieldUuid = inputToDefaultFieldMap.get(inputFieldUuid); // Get the Generated Default Metadata Field holding the result of the Union. // getUnionFieldForDefaultField() is a method defined in the Step. // The result of the union operation is sent to new generated fields. // It figures out how a Default Field is linked to the generated field. String unionResultField = getUnionFieldForDefaultField(defaultFieldUuid); // This holds the Union field corresponding to the Input Field. // Fields which are excluded from the union will have a null entry. unionResultFields.add(unionResultField); } // Iterate through the cached data Iterator<Object[]> it = inputData.iterator(); while (it.hasNext()) { Object[] row = it.next(); // Transmit data from: // input fields -> default -> generated default (union fields) -> output // Data excluded from the union will have a "null" field, // so nothing will be transmitted. this.beginInternalTransmission(row, unionResultFields); this.endInternalTransmission(dataPacket); } } // Accumulate all data before emitting to the next step emitData(dataPacket); } |
...